@@ -2,6 +2,8 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;

@@ -21,9 +23,9 @@ public class HelpMethods {
public static HashMap<String, Integer> attributeCalendarIndex;
public static HashMap<String, Integer> attributeReviewIndex;


//Parsing the price to a double
public static Double priceAsDouble(String s){
public static Double stringToDouble(String s){
String price = "";
for (int i = 0; i < s.length(); i++) {
if (s.charAt(i) != ',' && s.charAt(i) != '$'){
@@ -35,7 +37,7 @@ public static Double priceAsDouble(String s){
}
return Double.parseDouble(price);
}

public static String numberOfBedsToRoomType(String numberOfBeds){
if (numberOfBeds.isEmpty()){
return "Room for 0 persons";
@@ -47,7 +49,7 @@ public static String numberOfBedsToRoomType(String numberOfBeds){
return "Room for " + number + " persons";
}


//The indexes for each attribute
public static void mapAttributeAndIndex(JavaRDD<String> input, char csvFile){
HashMap<String, Integer> attributeIndex = new HashMap<String, Integer>();
@@ -84,19 +86,19 @@ else if (csvFile == 'c'){
}
attriButesToReturn[i] = entireRow[index];
}
// System.out.println(Arrays.toString(attriButesToReturn));
// System.out.println(Arrays.toString(attriButesToReturn));
return attriButesToReturn;
}
});
//Removes the header of the inpuRDD
Function2 removeHeader= new Function2<Integer, Iterator<String[]>, Iterator<String[]>>(){
public Iterator<String[]> call(Integer ind, Iterator<String[]> iterator) throws Exception {
if(ind==0 && iterator.hasNext()){
iterator.next();
return iterator;
}else
return iterator;
}
public Iterator<String[]> call(Integer ind, Iterator<String[]> iterator) throws Exception {
if(ind==0 && iterator.hasNext()){
iterator.next();
return iterator;
}else
return iterator;
}
};
// System.out.println(result.count());
JavaRDD<String[]> resultAfterCuttingHead = result.mapPartitionsWithIndex(removeHeader, true);
@@ -109,7 +111,12 @@ public static Function2<ArrayList<City>, String[], ArrayList<City>> addAndCountC

public ArrayList<City> call(ArrayList<City> cityList, String[] cityInfo)
throws Exception {
City city = new City(cityInfo[0], priceAsDouble(cityInfo[1]), cityInfo[2], priceAsDouble(cityInfo[3]), priceAsDouble(cityInfo[4]), priceAsDouble(cityInfo[5]), cityInfo[6], priceAsDouble(cityInfo[7]));
String cityName = cityInfo[0];
double price = stringToDouble(cityInfo[1]);
String room_type = cityInfo[2];
double reviews_per_month = stringToDouble(cityInfo[3]);

City city = new City(cityName, price, room_type, reviews_per_month);
if (cityExistsInCityList(cityList, city)){
updateCity(cityList, city);
}
@@ -121,7 +128,7 @@ public ArrayList<City> call(ArrayList<City> cityList, String[] cityInfo)
};
}

public static Function2<ArrayList<City>, ArrayList<City>, ArrayList<City>> combineCity(){
public static Function2<ArrayList<City>, ArrayList<City>, ArrayList<City>> combineCityLists(){
return new Function2<ArrayList<City>, ArrayList<City>, ArrayList<City>>() {
public ArrayList<City> call(ArrayList<City> cityList1, ArrayList<City> cityList2)
throws Exception {
@@ -137,7 +144,7 @@ public ArrayList<City> call(ArrayList<City> cityList1, ArrayList<City> cityList2
}
};
}


public static boolean cityExistsInCityList(ArrayList<City> cityList, City cityToCheck){
for (City city : cityList) {
@@ -147,154 +154,375 @@ public static boolean cityExistsInCityList(ArrayList<City> cityList, City cityTo
}
return false;
}

public static void updateCity(ArrayList<City> cityList, City city){
for (City cityToUpdate : cityList) {
if (cityToUpdate.getName().equals(city.getName())){
cityToUpdate.updateParameters(city);
}
}
}

public static void addNewCity(ArrayList<City> cityList, City city){
cityList.add(city);
}


//FOR LISTINGLIST
public static Function2<ArrayList<Listing>, String[], ArrayList<Listing>> addAndCountListing(){
return new Function2<ArrayList<Listing>, String[], ArrayList<Listing>>() {

public ArrayList<Listing> call(ArrayList<Listing> listingList, String[] listingInfo)
throws Exception {
Listing listing = new Listing(priceAsDouble(listingInfo[0]), listingInfo[1]);
if (listingExistsInListingList(listingList, listing)){
updateListing(listingList, listing);
}
else{
addNewListing(listingList, listing);
}
return listingList;
}
};
}
public static JavaPairRDD<String, String[]> mapToPair(JavaRDD<String> input, final String[] key, final String[] columns, final HashMap<String, Integer> attributeList){

public static Function2<ArrayList<Listing>, ArrayList<Listing>, ArrayList<Listing>> combineListing(){
return new Function2<ArrayList<Listing>, ArrayList<Listing>, ArrayList<Listing>>() {
public ArrayList<Listing> call(ArrayList<Listing> listingList1, ArrayList<Listing> listingList2)
throws Exception {
for (Listing listing2 : listingList2) {
if (listingExistsInListingList(listingList1, listing2)){
updateListing(listingList1, listing2);
}
else{
addNewListing(listingList1, listing2);
}
}
return listingList1;
}
};
}

public static boolean listingExistsInListingList(ArrayList<Listing> listingList, Listing listingToCheck){
for (Listing listing : listingList) {
if (listing.getId() == listingToCheck.getId()){
return true;
}
}
return false;
}

public static void updateListing(ArrayList<Listing> listingList, Listing listing){
for (Listing listingToUpdate : listingList) {
if (listingToUpdate.getId() == listing.getId()){
listingToUpdate.updateAvailable(listing);
}
}
}

public static void addNewListing(ArrayList<Listing> listingList, Listing listing){
listingList.add(listing);
}

public static void calculateTop3HostsWithHighestIncomeForEachCity(ArrayList<City> cityList, ArrayList<Listing> listingList){
for (City city : cityList) {
for (Host host : city.getHostList()) {
for (Listing listing : listingList) {
if (host.getListingIDs().containsKey((Integer) listing.getId())){
host.updateTotalIncome(listing.getId(), listing.getNumberOfNotAvailable());
}
}
}
city.updateTop3Hosts();
}
}

public static JavaPairRDD<String, String[]> mapToPair(JavaRDD<String> input, final String key, final String[] columns, final HashMap<String, Integer> attributeList){
PairFunction<String, String, String[]> keyData = new PairFunction<String, String, String[]>() {

public Tuple2<String, String[]> call(String s) throws Exception {
String[] entireRow = s.split("\t");
String[] attriButesToReturn = new String[columns.length];
int keyIndex = attributeList.get(key);
String keyToReturn = "";
for (int i = 0; i < key.length; i++) {
keyToReturn += entireRow[attributeList.get(key[i])];
if (i != key.length-1){
keyToReturn += " ";
}
}
for (int i = 0; i < columns.length; i++) {
int index = attributeList.get(columns[i]);
attriButesToReturn[i] = entireRow[index];
}
return new Tuple2(entireRow[keyIndex], attriButesToReturn);
return new Tuple2(keyToReturn, attriButesToReturn);
}
};
//Removes the header
Function2 removeHeader= new Function2<Integer, Iterator<String[]>, Iterator<String[]>>(){
public Iterator<String[]> call(Integer ind, Iterator<String[]> iterator) throws Exception {
if(ind==0 && iterator.hasNext()){
iterator.next();
return iterator;
}else
return iterator;
}
JavaPairRDD<String, String[]> listingPairs = input.mapToPair(keyData);

JavaPairRDD<String, String[]> listingPairsWithoutHeader = listingPairs.filter(new Function<Tuple2<String, String[]>, Boolean>(){
public Boolean call(Tuple2<String, String[]> v1) throws Exception {
for (int i = 0; i < columns.length; i++) {
if (!columns[i].equals(v1._2[i])){
return true;
}
}
return false;
}

});
return listingPairsWithoutHeader;
}

public static JavaPairRDD<String, String[]> mapToPairNewKey(JavaPairRDD<String, String[]> input, final int keyIndex){

PairFunction<Tuple2<String, String[]>, String, String[]> keyData = new PairFunction<Tuple2<String, String[]>, String, String[]>() {

public Tuple2<String, String[]> call(Tuple2<String, String[]> t)
throws Exception {
String[] attriButesToReturn = t._2;
return new Tuple2(attriButesToReturn[keyIndex], attriButesToReturn);
}
};
JavaPairRDD<String, String[]> listingPairs = input.mapToPair(keyData);
listingPairs.mapPartitionsWithIndex(removeHeader, true);
return listingPairs;
}

//Joining pair2 into pair1. pair1.leftouterJoin(Pair2).
public static JavaPairRDD<String, String[]> lefOuterJoin(JavaPairRDD<String, String[]> pair1, JavaPairRDD<String, String[]> pair2){
JavaPairRDD<String, Tuple2<String[], Optional<String[]>>> joinedPair = pair1.leftOuterJoin(pair2);

// joinedPair.foreach(new VoidFunction<Tuple2<String, Tuple2<String[], Optional<String[]>>>>(){
//
// public void call(
// Tuple2<String, Tuple2<String[], Optional<String[]>>> t)
// throws Exception {
// System.out.println("t1: " +Arrays.toString(t._2._1));
// System.out.println("t2: " +Arrays.toString(t._2._2.get()));
// }
// });



JavaPairRDD<String, String[]> joinedAndReducedPair = joinedPair
.mapToPair(new PairFunction<Tuple2<String, Tuple2<String[], Optional<String[]>>>, String, String[]>() {
public Tuple2<String, String[]> call(
Tuple2<String, Tuple2<String[], Optional<String[]>>> t)
throws Exception {
if (t._2._2.isPresent()){
String[] newTuple = new String[t._2._1.length+t._2._2.get().length];
String[] newTuple = new String[t._2._1.length+t._2._2.get().length-1];
for (int i = 0; i < t._2._1.length; i++) {
newTuple[i] = t._2._1[i];
}
for (int j = t._2._1.length; j < newTuple.length; j++) {
//make sure to not duplicate the key in the tupple
if (t._2._2.get()[j-t._2._1.length].equals(t._1)){
if (t._2._2.get()[j-t._2._1.length+1].equals("t")){
newTuple[j] = "0";
}
else if (t._2._2.get()[j-t._2._1.length+1].equals("f")){
newTuple[j] = "1";
}
else{
newTuple[j] = t._2._2.get()[j-t._2._1.length+1];
}
continue;
}
newTuple[j] = t._2._2.get()[j-t._2._1.length];
}
return new Tuple2<String, String[]>(t._1(), newTuple);
return new Tuple2<String, String[]>(t._1, newTuple);
}
else{
return new Tuple2<String, String[]>(t._1(), null);
return new Tuple2<String, String[]>(t._1, t._2._1);
}
}
});
return joinedAndReducedPair;
}

public static JavaPairRDD<String, String[]> reduceByKeyOnAvailable(JavaPairRDD<String, String[]> joinedPair){


JavaPairRDD<String, String[]> joinedPairReducedByKey = joinedPair.reduceByKey(new Function2<String[], String[], String[]>(){
public String[] call(String[] v1, String[] v2) {
int numberOfNotAvailable1 = Integer.parseInt(v1[v1.length-1]);
int numberOfNotAvailable2 = Integer.parseInt(v2[v2.length-1]);
int sum = numberOfNotAvailable1 + numberOfNotAvailable2;
v1[v1.length-1] = ""+sum;
// System.out.println(Arrays.toString(v1));
return v1;
}
});
// System.out.println(joinedPair.count());
return joinedPairReducedByKey;
}

public static JavaPairRDD<String, String[]> reduceByKeyOnHostId(JavaPairRDD<String, String[]> joinedPair){
JavaPairRDD<String, String[]> joinedPairReducedByKey = joinedPair.reduceByKey(new Function2<String[], String[], String[]>(){
public String[] call(String[] v1, String[] v2) {
return v1;
}
});
// System.out.println(joinedPair.count());
return joinedPairReducedByKey;
}

public static Function2<ArrayList<City>, Tuple2<String, String[]>, ArrayList<City>> addAndCountCityAndHost(){
return new Function2<ArrayList<City>, Tuple2<String, String[]>, ArrayList<City>>(){

public ArrayList<City> call(ArrayList<City> cityList,
Tuple2<String, String[]> keyValuePair) throws Exception {
// System.out.println(Arrays.toString(keyValuePair._2));
String cityName = keyValuePair._2[0];
double price = stringToDouble(keyValuePair._2[1]);
double hostID = stringToDouble(keyValuePair._2[3]);
String hostName = keyValuePair._2[4];
double totalListingsForHost = stringToDouble(keyValuePair._2[5]);
double numberOfNightsNotAvailable = 0;
if (keyValuePair._2.length > 6){
numberOfNightsNotAvailable = stringToDouble(keyValuePair._2[6]);
}
City city = new City(cityName, price, hostID, hostName, totalListingsForHost, numberOfNightsNotAvailable);
if (cityExistsInCityList(cityList, city)){
updateCity(cityList, city);
}
else{
addNewCity(cityList, city);
}
return cityList;
};
};
}

public static Function2<ArrayList<City>, Tuple2<String, String[]>, ArrayList<City>> addAndCountCityAndReviewers(){
return new Function2<ArrayList<City>, Tuple2<String, String[]>, ArrayList<City>>(){

public ArrayList<City> call(ArrayList<City> cityList,
Tuple2<String, String[]> keyValuePair) throws Exception {
// System.out.println(Arrays.toString(keyValuePair._2));
double reviewer_id = stringToDouble(keyValuePair._2[1]);
String reviewer_name = keyValuePair._2[2];
String cityName = keyValuePair._2[3];
double price = stringToDouble(keyValuePair._2[4]);

City city = new City(cityName, price, reviewer_id, reviewer_name);
if (cityExistsInCityList(cityList, city)){
updateCity(cityList, city);
}
else{
addNewCity(cityList, city);
}
return cityList;
};
};
}

public static Function2<String[], String[], String[]> addAndCombinePairAverage(){

return new Function2<String[], String[], String[]>() {
public String[] call(String[] v1, String[] v2) throws Exception {
//{"key", "total", "number"}
v1[0] = v2[0];
if (v1[1] == null){
v1[1] = v2[1];
}else{
v1[1] = (double)(stringToDouble(v1[1]) + stringToDouble(v2[1]))+"";
}
if (v1[2] == null){
v1[2] = "1";
}
else{
v1[2] = (int)(stringToDouble(v1[2]) + 1)+"";
}
return v1;
}
};
}

public static Function2<String[], String[], String[]> combinePairAverage(){
return new Function2<String[], String[], String[]>(){

public String[] call(String[] v1, String[] v2) throws Exception {
//{"key", "total", "number"}
String key = v1[0];
String total = (double)(stringToDouble(v1[1]) + stringToDouble(v2[1]))+"";
String number = (int)(stringToDouble(v1[2]) + stringToDouble(v2[2]))+"";
String[] ret = {key, total, number};
return ret;
}
};
}

public static Function2<String[], String[], String[]> addAndCombineEstimatedNumberOfNights(){

return new Function2<String[], String[], String[]>() {
public String[] call(String[] v1, String[] v2) throws Exception {
//{"key", "total", "number"}
v1[0] = v2[0];
if (v1[1] == null){
v1[1] = (double)(stringToDouble(v2[1])/0.7*3)+"";
}else{
v1[1] = (double)(stringToDouble(v1[1]) + stringToDouble(v2[1])/0.7*3)+"";
}
return v1;
}
};
}
public static Function2<String[], String[], String[]> combinePairEstimatedNumberOfNights(){
return new Function2<String[], String[], String[]>(){

public String[] call(String[] v1, String[] v2) throws Exception {
//{"key", "total", "number"}
String key = v1[0];
String total = (double)(stringToDouble(v1[1]) + stringToDouble(v2[1]))+"";
String[] ret = {key, total};
return ret;
}
};
}

public static Function2<String[], String[], String[]> addAndCombineAmountOfMoneySpent(){

return new Function2<String[], String[], String[]>() {
public String[] call(String[] v1, String[] v2) throws Exception {
//{"key", "total amount of money spent"}
v1[0] = v2[0];
if (v1[1] == null){
v1[1] = (double)(stringToDouble(v2[1])/0.7*3*12*stringToDouble(v2[2]))+"";
}else{
v1[1] = (double)(stringToDouble(v1[1]) + stringToDouble(v2[1])/0.7*3*12*stringToDouble(v2[2]))+"";
}
String[] ret = {v1[0], v1[1]};
return ret;
}
};
}
public static Function2<String[], String[], String[]> combinePairAmountOfMoneySpent(){
return new Function2<String[], String[], String[]>(){

public String[] call(String[] v1, String[] v2) throws Exception {
//{"key", "total", "number"}
String key = v1[0];
String total = (double)(stringToDouble(v1[1]) + stringToDouble(v2[1]))+"";
String[] ret = {key, total};
return ret;
}
};
}

public static Function2<ArrayList<Host>, String[], ArrayList<Host>> addAndCombineTop3Hosts(){

return new Function2<ArrayList<Host>, String[], ArrayList<Host>>() {

public ArrayList<Host> call(ArrayList<Host> hostList, String[] v2)
throws Exception {
//{"city", "price", "id", "host_id", "host_name", "host_total_listings_count"}
double id = stringToDouble(v2[3]);
String hostName = v2[4];
double totalListings = stringToDouble(v2[5]);
double price = stringToDouble(v2[1]);
double numberOfNightsNotAvailable = stringToDouble(v2[6]);
Host host = new Host((int) id, hostName, (int)totalListings, price, (int) numberOfNightsNotAvailable);
hostList = updateTop3Hosts(hostList, host);
return hostList;
}
};
}
public static ArrayList<Host> updateTop3Hosts(ArrayList<Host> hostList, Host host){
if (hostList.contains(host)){
hostList.remove(host);
}
//adds host if null in host list
if (hostList.size() < 3){
hostList.add(host);
}
else{
for (int i = 0; i < 3; i++) {
if (host.getTotalIncome() > hostList.get(i).getTotalIncome()){
hostList.set(i, host);
hostList.remove(hostList.size()-1);
break;
}
}
}
//sort the hosts from high to low total income
Collections.sort(hostList, new Comparator<Host>() {
public int compare(Host h1, Host h2) {
if (h1.getTotalIncome() < h2.getTotalIncome()){
return 1;
}
else if (h1.getTotalIncome() > h2.getTotalIncome()){
return -1;
}
else{
return 0;
}
}
});
return hostList;
}

public static Function2<ArrayList<Host>, ArrayList<Host>, ArrayList<Host>> combinePairTop3Hosts(){
return new Function2<ArrayList<Host>, ArrayList<Host>, ArrayList<Host>>(){

public ArrayList<Host> call(ArrayList<Host> hostList1, ArrayList<Host> hostList2)
throws Exception {
for (Host host2 : hostList2) {
for (Host host1 : hostList1) {
if (host1.getId() == host2.getId()){
host2.updateTotalIncome(host1);
}
}
hostList1 = updateTop3Hosts(hostList1, host2);
}
return hostList1;
}
};
}

public static Function2<double[], Tuple2<String, String[]>, double[]> addAndCombineAverageNumberOfListings(){

return new Function2<double[], Tuple2<String, String[]>, double[]>() {

public double[] call(double[] v1, Tuple2<String, String[]> v2)
throws Exception {
v1[0] += stringToDouble(v2._2[1]);
v1[1] ++;
if (stringToDouble(v2._2[1]) > 1){
v1[2]++;
}
return v1;
}

};
}
public static Function2<double[], double[], double[]> combinePairAverageNumberOfListings(){
return new Function2<double[], double[], double[]>(){

public double[] call(double[] v1, double[] v2) throws Exception {
v1[0] += v2[0];
v1[1] += v2[1];
v1[2] += v2[2];
return v1;
}
};
}
}
@@ -1,40 +1,21 @@
package airbnb;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import scala.Serializable;

public class Host implements Serializable{

private final int id;
//listings id is key and price of listing per night is value
private HashMap<Integer, Double> listingIDs;
private double totalIncome;
private int totalListings;

private final String hostName;

public Host(int id, String hostName, int listingID, int totalListings, double price){
public Host(int id, String hostName, int totalListings, double price, int numberOfNightsNotAvailable){
this.id = id;
this.hostName = hostName;
this.totalListings = totalListings;
this.totalIncome = 0;
listingIDs = new HashMap<Integer, Double>();
listingIDs.put(listingID, price);
}

public void updateParameters(Host host){
Iterator it = host.listingIDs.entrySet().iterator();
while (it.hasNext()) {
Map.Entry pair = (Map.Entry)it.next();
if (!this.listingIDs.containsKey(pair.getKey())){
this.listingIDs.put((Integer)pair.getKey(), (Double)pair.getValue());
}
it.remove();
}
this.totalIncome = price*numberOfNightsNotAvailable;
}

public int getId() {
@@ -53,13 +34,8 @@ public int getTotalListings() {
return totalListings;
}

public void updateTotalIncome(Integer listingId, int numberOfNotAvialable){
double price = listingIDs.get(listingId);
totalIncome += price*numberOfNotAvialable;
}

public HashMap<Integer, Double> getListingIDs() {
return listingIDs;
public void updateTotalIncome(Host newHost){
totalIncome += newHost.totalIncome;
}

}
@@ -0,0 +1,84 @@
package airbnb;

import java.text.DecimalFormat;
import java.util.ArrayList;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

public class Print {

public static DecimalFormat numberFormat = new DecimalFormat("#.00");


public static void task3a(JavaPairRDD<String, String[]> mappedListingsPairAggregated){
System.out.println("Avg booking price per night:");
mappedListingsPairAggregated.foreach(new VoidFunction<Tuple2<String, String[]>>(){
public void call(Tuple2<String, String[]> t) throws Exception {
double avg = HelpMethods.stringToDouble(t._2[1])/HelpMethods.stringToDouble(t._2[2]);
System.out.println(t._1 + ":\t$" + numberFormat.format(avg));
}
});
}

public static void task3b(JavaPairRDD<String, String[]> mappedListingsPairAggregated){
System.out.println("Avg booking price per room type per night:");
mappedListingsPairAggregated.foreach(new VoidFunction<Tuple2<String, String[]>>(){
public void call(Tuple2<String, String[]> t) throws Exception {
double avg = HelpMethods.stringToDouble(t._2[1])/HelpMethods.stringToDouble(t._2[2]);
System.out.println(t._1 + ":\t$" + numberFormat.format(avg));
}
});
}

public static void task3c(JavaPairRDD<String, String[]> mappedListingsPairAggregated){
System.out.println("Avg number of reviews per month:");
mappedListingsPairAggregated.foreach(new VoidFunction<Tuple2<String, String[]>>(){
public void call(Tuple2<String, String[]> t) throws Exception {
double avg = HelpMethods.stringToDouble(t._2[1])/HelpMethods.stringToDouble(t._2[2]);
System.out.println(t._1 + ":\t" + numberFormat.format(avg));
}
});
}

public static void task3d(JavaPairRDD<String, String[]> mappedListingsPairAggregated){
System.out.println("Estimated number of nights booked per year:");
mappedListingsPairAggregated.foreach(new VoidFunction<Tuple2<String, String[]>>(){
public void call(Tuple2<String, String[]> t) throws Exception {
System.out.println(t._1 + ":\t" + numberFormat.format(Double.parseDouble(t._2[1])));
}
});
}

public static void task3e(JavaPairRDD<String, String[]> mappedListingsPairAggregated){
System.out.println("Estimated amount of money spent on AirBnB accomodation per year:");
mappedListingsPairAggregated.foreach(new VoidFunction<Tuple2<String, String[]>>(){
public void call(Tuple2<String, String[]> t) throws Exception {
System.out.println(t._1 + ":\t$" + numberFormat.format(Double.parseDouble(t._2[1])));
}
});
}

public static void task4ab(double[] ret){
System.out.println("Global avg number of listings per host: " + numberFormat.format(ret[0]/ret[1]));
System.out.println("Percentage of hosts with more than 1 listing: " + numberFormat.format(100*ret[2]/ret[1])+"%");
}

public static void task4c(JavaPairRDD<String, ArrayList<Host>> mappedListingsPairAggregated){
mappedListingsPairAggregated.foreach(new VoidFunction<Tuple2<String, ArrayList<Host>>>(){
public void call(Tuple2<String, ArrayList<Host>> t)
throws Exception {
System.out.println("Top 3 hosts with the highsest income for city " + t._1+":");
int rank = 1;
for (Host host : t._2) {
System.out.println(rank +". Name: " + host.getHostName() + "\tID: " + host.getId() + "\t Income: $" + host.getTotalIncome());
rank++;
}

}
});
}

}
@@ -4,6 +4,10 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Scanner;

import javax.annotation.OverridingMethodsMustInvokeSuper;
import javax.validation.OverridesAttribute;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
@@ -14,6 +18,7 @@
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.RowFactory;

import scala.Serializable;
import scala.Tuple2;
@@ -30,112 +35,158 @@ public Program(JavaSparkContext sc){
listings_usRDD = sc.textFile("target/listings_us.csv");
reviews_usRDD = sc.textFile("target/reviews_us.csv");
calendar_usRDD = sc.textFile("target/calendar_us.csv");
task3();
task4();
}


public static void task3(){
String[] columndNeededListings = {"city", "price", "room_type", "reviews_per_month"};
HelpMethods.mapAttributeAndIndex(listings_usRDD, 'l');
JavaRDD<String[]> mappedListings = HelpMethods.mapToColumns(listings_usRDD, columndNeededListings, 'l');

//avg booking price per night and per room type
ArrayList<City> initialCityList = new ArrayList<City>();
ArrayList<City> cityList = mappedListings.aggregate(initialCityList, HelpMethods.addAndCountCity(), HelpMethods.combineCity());

printTask3(cityList);

// String[] columndNeededCalendar = {"listing_id", "available"};
// HelpMethods.mapAttributeAndIndex(calendar_usRDD, 'c');
// JavaRDD<String[]> mappedCalendar = HelpMethods.mapToColumns(calendar_usRDD, columndNeededCalendar, 'c');
// ArrayList<Listing> initialListingList = new ArrayList<Listing>();
// ArrayList<Listing> listingList = mappedCalendar.aggregate(initialListingList, HelpMethods.addAndCountListing(), HelpMethods.combineListing());
//
// //calculating top 3 hosts with the highest income
// HelpMethods.calculateTop3HostsWithHighestIncomeForEachCity(cityList, listingList);
// printTask4(cityList, listingList);
Scanner sc = new Scanner(System.in);
System.out.println("Which subtask do you want to run? (a, b, c, d or e)");
String subtask = sc.next();
if (sc.next().equals("a")){
String[] columndNeededListingsa = {"city", "price"};
String[] keysa = {"city"};
JavaPairRDD<String, String[]> mappedListingsPair = HelpMethods.mapToPair(listings_usRDD, keysa, columndNeededListingsa, HelpMethods.attributeListingIndex);

String[] initial = new String[3];
JavaPairRDD<String, String[]> mappedListingsPairAggregated = mappedListingsPair.aggregateByKey(initial, HelpMethods.addAndCombinePairAverage(), HelpMethods.combinePairAverage());
Print.task3a(mappedListingsPairAggregated);
}
else if (sc.next().equals("b")){
String[] columndNeededListingsb = {"city", "price"};
String[] keysb = {"city", "room_type"};
JavaPairRDD<String, String[]> mappedListingsPair = HelpMethods.mapToPair(listings_usRDD, keysb, columndNeededListingsb, HelpMethods.attributeListingIndex);

String[] initial = new String[3];
JavaPairRDD<String, String[]> mappedListingsPairAggregated = mappedListingsPair.aggregateByKey(initial, HelpMethods.addAndCombinePairAverage(), HelpMethods.combinePairAverage());
Print.task3b(mappedListingsPairAggregated);
}
else if (sc.next().equals("c")){
String[] columndNeededListingsc = {"city", "reviews_per_month"};
String[] keysc = {"city"};
JavaPairRDD<String, String[]> mappedListingsPair = HelpMethods.mapToPair(listings_usRDD, keysc, columndNeededListingsc, HelpMethods.attributeListingIndex);

String[] initial = new String[3];
JavaPairRDD<String, String[]> mappedListingsPairAggregated = mappedListingsPair.aggregateByKey(initial, HelpMethods.addAndCombinePairAverage(), HelpMethods.combinePairAverage());
Print.task3c(mappedListingsPairAggregated);
}
else if (sc.next().equals("d")){
String[] columndNeededListingsd = {"city", "reviews_per_month"};
String[] keysd = {"city"};
JavaPairRDD<String, String[]> mappedListingsPair = HelpMethods.mapToPair(listings_usRDD, keysd, columndNeededListingsd, HelpMethods.attributeListingIndex);

String[] initial = new String[3];
JavaPairRDD<String, String[]> mappedListingsPairAggregated = mappedListingsPair.aggregateByKey(initial, HelpMethods.addAndCombineEstimatedNumberOfNights(), HelpMethods.combinePairEstimatedNumberOfNights());
Print.task3d(mappedListingsPairAggregated);
}
else if (sc.next().equals("e")){
String[] columndNeededListingse = {"city", "reviews_per_month", "price"};
String[] keyse = {"city"};
JavaPairRDD<String, String[]> mappedListingsPair = HelpMethods.mapToPair(listings_usRDD, keyse, columndNeededListingse, HelpMethods.attributeListingIndex);

String[] initial = new String[4];
JavaPairRDD<String, String[]> mappedListingsPairAggregated = mappedListingsPair.aggregateByKey(initial, HelpMethods.addAndCombineAmountOfMoneySpent(), HelpMethods.combinePairAmountOfMoneySpent());
Print.task3e(mappedListingsPairAggregated);
}
}

public static void task4(){
String[] columndNeededListings = {"city", "price", "room_type", "reviews_per_month", "id", "host_id", "host_name", "host_total_listings_count"};
HelpMethods.mapAttributeAndIndex(listings_usRDD, 'l');
JavaPairRDD<String, String[]> listingPairs = HelpMethods.mapToPair(listings_usRDD, "id", columndNeededListings, HelpMethods.attributeListingIndex);

String[] columndNeededCalendar = {"listing_id", "available"};
HelpMethods.mapAttributeAndIndex(calendar_usRDD, 'c');
JavaPairRDD<String, String[]> calendarPairs = HelpMethods.mapToPair(calendar_usRDD, "listing_id", columndNeededCalendar, HelpMethods.attributeCalendarIndex);

//Using left outer join because have to make sure that all listings are included in the result
JavaPairRDD<String, String[]> joinedPair = HelpMethods.lefOuterJoin(listingPairs, calendarPairs);

new JavaPairRDD<String, String[]>(null, null, null);

// joinedPair.red

// joinedPair.combineByKey(createCombiner, mergeValue, mergeCombiners)

joinedPair.foreach(new VoidFunction<Tuple2<String, String[]>>(){

public void call(Tuple2<String, String[]> t) throws Exception {
System.out.println(Arrays.toString(t._2));
}
});;

Scanner sc = new Scanner(System.in);
System.out.println("Which subtask do you want to run? (a, b or c)");
String subtask = sc.next();
if (subtask.equals("a") || subtask.equals("b")){
String[] columndNeededListings = {"host_id", "host_total_listings_count"};
String[] keysListing = {"host_id"};
JavaPairRDD<String, String[]> listingPairs = HelpMethods.mapToPair(listings_usRDD, keysListing, columndNeededListings, HelpMethods.attributeListingIndex);
JavaPairRDD<String, String[]> listingPairsReduced = HelpMethods.reduceByKeyOnHostId(listingPairs);

// listingPairsReduced.foreach(new VoidFunction<Tuple2<String, String[]>>(){
//
// public void call(Tuple2<String, String[]> t) throws Exception {
// System.out.println(Arrays.toString(t._2));
// }
// });;

//{total listings, total number of hosts, hosts with more than 1 listing}
double[] initial = {0.0, 0.0, 0.0};
double[] totalAndNumberOfListings = listingPairsReduced.aggregate(initial, HelpMethods.addAndCombineAverageNumberOfListings(), HelpMethods.combinePairAverageNumberOfListings());
Print.task4ab(totalAndNumberOfListings);
}
else if (subtask.equals("c")){
String[] columndNeededListings = {"city", "price", "id", "host_id", "host_name", "host_total_listings_count"};
HelpMethods.mapAttributeAndIndex(listings_usRDD, 'l');
String[] keys1 = {"id"};
JavaPairRDD<String, String[]> listingPairs = HelpMethods.mapToPair(listings_usRDD, keys1, columndNeededListings, HelpMethods.attributeListingIndex);

String[] columndNeededCalendar = {"listing_id", "available"};
HelpMethods.mapAttributeAndIndex(calendar_usRDD, 'c');
String[] keys2 = {"listing_id"};
JavaPairRDD<String, String[]> calendarPairs = HelpMethods.mapToPair(calendar_usRDD, keys2, columndNeededCalendar, HelpMethods.attributeCalendarIndex);

//Using left outer join because have to make sure that all listings are included in the result
JavaPairRDD<String, String[]> joinedPair = HelpMethods.lefOuterJoin(listingPairs, calendarPairs);

JavaPairRDD<String, String[]> joinedPairReducedByKey = HelpMethods.reduceByKeyOnAvailable(joinedPair);

JavaPairRDD<String, String[]> joinedPairWithCityAsKey = HelpMethods.mapToPairNewKey(joinedPairReducedByKey, 0);
//{city, hostid_rank1, hostname, income}

ArrayList<Host> initial = new ArrayList<Host>();
JavaPairRDD<String, ArrayList<Host>> joinedPairAggregated = joinedPairWithCityAsKey.aggregateByKey(initial, HelpMethods.addAndCombineTop3Hosts(), HelpMethods.combinePairTop3Hosts());
Print.task4c(joinedPairAggregated);
}
}

public static void task5(){

// String[] columnsNeededListings = {"city", "price", "id"};
// HelpMethods.mapAttributeAndIndex(listings_usRDD, 'l');
// JavaPairRDD<String, String[]> listingPairs = HelpMethods.mapToPair(listings_usRDD, "id", columnsNeededListings, HelpMethods.attributeListingIndex);
//
// String[] columnsNeededReviews = {"listing_id", "reviewer_id", "reviewer_name"};
// HelpMethods.mapAttributeAndIndex(reviews_usRDD, 'r');
// JavaPairRDD<String, String[]> reviewerPairs = HelpMethods.mapToPair(reviews_usRDD, "listing_id", columnsNeededReviews, HelpMethods.attributeReviewIndex);
//
// JavaPairRDD<String, String[]> joinedPair = HelpMethods.lefOuterJoin(reviewerPairs, listingPairs);
//
// ArrayList<City> initialCityList = new ArrayList<City>();
// ArrayList<City> cityList = joinedPair.aggregate(initialCityList, HelpMethods.addAndCountCityAndReviewers(), HelpMethods.combineCityLists());
// printTask5(cityList);
// joinedPair.foreach(new VoidFunction<Tuple2<String, String[]>>(){
// public void call(Tuple2<String, String[]> t) throws Exception {
// System.out.println(Arrays.toString(t._2));
// }
// });;

}

public static void printTask3(ArrayList<City> cityList){
int rooms = 0;
DecimalFormat numberFormat = new DecimalFormat("#.00");
public static void printTask5(ArrayList<City> cityList){
Reviewer reviewerSpendingMostMoney = null;
for (City city : cityList) {
System.out.println(city.getName());
System.out.println("AvgPrice: $" + numberFormat.format(city.getAveragePricePerNight()) + " Avg nr. of reviews: " + numberFormat.format(city.getAvgNumberOfReviewsPerMonth())
+ " Est. nr of nights: " + numberFormat.format(city.getEstimatedNumberOfNightsBookedPerYear()) + " Est. amount of money spent in a year: $" + numberFormat.format(city.getEstimatedAmountOfMoneySpentPerYear()));
System.out.println("Avg. price per room type: ");
for (RoomType roomType : city.getRoomTypeList()) {
System.out.println(roomType.getRoomType() + ": $" + numberFormat.format(roomType.getAvgPricePerNight()));
System.out.println("Top 3 guests ranked by their number of bookings: ");
int rank = 1;
for (Reviewer reviewer : city.getTop3ReviewersRankedByNumberOfBookings()) {
System.out.println(rank + ". Name: " + reviewer.getName() + "\tID: " + reviewer.getId() + "\t Number of bookings: "+ reviewer.getNumberOfBookings());
rank++;
}
rooms += city.getNumberOfRooms();
System.out.println();
}
System.out.println("Number of rooms: " + rooms);
}

public static void printTask4(ArrayList<City> cityList, ArrayList<Listing> listingList){
//4a and 4b
int numberOfHosts = 0;
int numberOfListings = 0;
int numberOfHostsWithMoreThanOneListing = 0;
for (City city : cityList) {
for (Host host : city.getHostList()) {
numberOfListings += host.getTotalListings();
numberOfHosts++;
if (host.getTotalListings() > 1){
numberOfHostsWithMoreThanOneListing++;
}
if (reviewerSpendingMostMoney == null){
reviewerSpendingMostMoney = city.getReviewerSpendingMostMoney();
}
}
double globalAverageNumberOfListingsPerHost = (double) numberOfListings/numberOfHosts;
System.out.println("Global avg number of listings per host: " + globalAverageNumberOfListingsPerHost);
double percentageOfHostsWithMoreThanOneListing = (double) numberOfHostsWithMoreThanOneListing/numberOfHosts;
System.out.println("Percentage of hosts with more than 1 listings: " + percentageOfHostsWithMoreThanOneListing);
for (City city : cityList) {
System.out.println("Top 3 in " +city.getName() + ": ");
int rank = 1;
for (Host host : city.getTop3Hosts()) {
if (host != null){
System.out.println(rank +". Name: " + host.getHostName() + "\tID: " + host.getId() + "\t Income: $" + host.getTotalIncome());
else{
if (reviewerSpendingMostMoney.getTotalAmountSpentOnAccomodation() < city.getReviewerSpendingMostMoney().getTotalAmountSpentOnAccomodation()){
reviewerSpendingMostMoney = city.getReviewerSpendingMostMoney();
}
rank++;
}
}
System.out.println();
System.out.println("The reviewer spending the most money is: ");
System.out.println("Name: " + reviewerSpendingMostMoney.getName() + " ID: "+reviewerSpendingMostMoney.getId() + " Spent: $" + reviewerSpendingMostMoney.getTotalAmountSpentOnAccomodation() + " Number of bookings: " + reviewerSpendingMostMoney.getNumberOfBookings());
}

// public static void task4(){
// String[] columndNeeded = {"host_id", "host_listings_count"};
// JavaRDD<String[]> mappedListings = HelpMethods.mapToColumns(listings_usRDD, columndNeeded);
// }



public static void main(String[] args) {
@@ -144,6 +195,7 @@ public static void main(String[] args) {
.setMaster("local[*]")
;
JavaSparkContext sc = new JavaSparkContext(conf);

new Program(sc);
}