In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
import collections
from pyspark.sql.functions import *
from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql import Window
from haversine import haversine
import time

In [2]:
spark = SparkSession.builder.config("spark.sql.warehouse.dir", "data").appName("SparkSQL").getOrCreate()

Hotel dataset (hotels-exmpl: 500 εγγραφές). Για λόγους διευκόλυνσης του ελέγχου η δοκιμή παρουσιάζεται σε δοκιμαστικό 
σύνολο δεδομένων για τα ξενοδοχεία σε 500 εγγραφές και για τα εστιατόρια σε 540 εγγραφές. Ελέγχθηκε και στο δοσμένο αρχικό σύνολο δεδομένων χωρίς να συναντάται χρονική απόκλιση, ωστόσο για λόγους σύγκρισης των δύο αλγορίθμων (parallel algorithm και cross join algorithm) που θα παρουσιαστούν στην εργασία επιλέχθηκε μικρό δοκιμαστικό dataset.

*Στον συγκριτικό κώδικα cross join που θα παρουσιαστεί, ο χρόνος εκτέλεσης στο δοσμένο αρχικό dataset είναι κάποιες ώρες οπότε καθίσταται χρονοβόρα η δοκιμή του. Για αυτό ο έλεγχος των δύο αλγορίθμων παρουσιάζεται σε μικρότερα σύνολα. Ωστόσο, η αποδοτικότητα του parallel algorithm που αναπτύχθηκε φάνηκε ότι δεν επηρεάζεται χρονικά από το μέγεθος του συνόλου εγγραφών, κάτι που είναι και ο σκοπός μας.

In [5]:
hotel_rdd = sc.textFile("hotels-exmpl.txt")
hotel_rdd1 = hotel_rdd.map(lambda x : (x.split("|")[0], x.split("|")[1], x.split("|")[4], x.split("|")[5]))
hotel_df1 = hotel_rdd1.toDF()
hotel_df2 = hotel_df1.selectExpr("_1 as _id", "_2 as name","_3 as lat", "_4 as lon")
#δημιουργούμε μία νέα στήλη με τιμές "Β" ώστε να διαχωρίζουμε τις εγγραφές που αφορούν τα ξενοδοχεία
hotel_df3 = hotel_df2.withColumn("dataset_id", lit("B"))  

Για να δούμε πόσες εγγραφές έχουμε από ξενοδοχεία εκτελούμε την ακόλουθη εντολή

In [4]:
hotel_df3.count() 

500

Αντίστοιχα εισάγουμε και το δοκιμαστικό σύνολο δεδομένων για τα εστιατόρια (540 εγγραφές).

In [6]:
rest_rdd2 = sc.textFile("restaurants-exmpl.txt")
rest_rdd3 = rest_rdd2.map(lambda x : (x.split("|")[0], x.split("|")[1], x.split("|")[3], x.split("|")[4]))
rest_df4 = rest_rdd3.toDF()
rest_df5 = rest_df4.selectExpr("_1 as _id", "_2 as name","_3 as lat", "_4 as lon")
#δημιουργούμε μία νέα στήλη με τιμές "Α" ώστε να διαχωρίζουμε τις εγγραφές που αφορούν τα εστιατόρια
rest_df6 = rest_df5.withColumn("dataset_id", lit("A"))

Για να δούμε πόσες εγγραφές έχουμε από ξενοδοχεία εκτελούμε την ακόλουθη εντολή

In [9]:
rest_df6.count() 

540

Aκολούθως, ενώνουμε τα δύο dataframes σε ένα που θα το ονομάσουμε df_concat.

In [10]:
df_concat = hotel_df3.union(rest_df6)
#print(df_concat.show())

Tαξινομούμε τις εγγραφές κατά αύξον γεωγραφικό πλάτος 

In [11]:
df_concat1 = df_concat.orderBy(df_concat["lat"].asc())
#print(df_concat1.show())

Προσθέτουμε μία νέα στήλη ("increase_id") που θα μας βοηθήσει στην συνέχεια ώστε να διαμοιράσουμε τα δεδομένα μας 
στα partitions. Οι τιμές ξεκινούν από το 1 και φθάνουν μέχρι και τον αριθμό που ισούται το πλήθος των εγγραφών.

In [13]:
dataset = df_concat1.withColumn("increas_id",row_number().over(Window.orderBy(monotonically_increasing_id()))-1)
dataset.show()

+---+--------------------+----------------+-----------------+----------+----------+
|_id|                name|             lat|              lon|dataset_id|increas_id|
+---+--------------------+----------------+-----------------+----------+----------+
|479|     Jack in the Box|       32.555577|      -117.051702|         A|         0|
|446|Best Western Amer...|         32.5584|       -117.06005|         B|         1|
|365|       Coco's Bakery|        32.55852|      -117.060147|         A|         2|
| 73|America's Best Va...|           32.56|      -117.062943|         B|         3|
|255|      Baskin-Robbins|       32.610854|      -117.068653|         A|         4|
|317|Best Western-Chul...|        32.61546|       -117.08347|         B|         5|
|276|Bay Breeze Inn an...|       32.629683|      -117.092798|         B|         6|
|296|         Big 7 Motel|       32.636501|      -117.091003|         B|         7|
|330|Best Western Sout...|        32.64013|       -117.09779|         B|    

Έχει έρθει η ώρα για το διαμοιρασμό των δεδομένων στα επιμέρους partitions. Ιδανικό πλήθος partitions, ώστε ο αλγόριθμος να 
επιτυγχάνει τη μέγιστη δυνατή αποδοτικότητα, είναι τόσα όσα και τα threads του υπολογιστή μας. Ο αλγόριθμος δοκιμάστηκε σε 
υπολογιστή δύο πυρήνων, καθενός εκ των οποίων "τρέχει" δύο threads. Δηλαδή, ιδανικός αριθμός partitions στην περίπτωση μας είναι 4. Σε περίπτωση που τρέχαμε τον αλγόριθμο σε cluster υπολογιστών θα πολ/ζαμε και με το πλήθος των μηχανημάτων από τα οπόια θα απαρτιζόταν το cluster μας. (δηλαδή σε επίπεδο cluster υπολογιστών θα ίσχυε: machines X cores X threads = ιδανικό πλήθος partitions).

In [14]:
dataset1 = dataset.withColumn("partition",(col("increas_id")/260).cast("int"))  
dataset2 = dataset1.withColumn("lat", dataset1["lat"].cast("float"))
dataset3 = dataset2.withColumn("lon", dataset["lon"].cast("float"))
dataset1_partitioned = dataset3.repartitionByRange(4, col("partition"))
#dataset1_partitioned.write.mode("overwrite").csv("dokimastika/results.txt")

Για να εκτελείται ο αλγόριθμος παράλληλα θα πρέπει κάθε partition να επεξεργάζεται το ερώτημα που θα του θέσουμε παρακάτω, 
ανεξάρτητα από το άλλο partition. Αυτό δημιουργεί την ανάγκη κάθε partition να περιέχει τις εγγραφές που χρειάζεται ώστε να "τρέξει" σωστά το ερώτημα. Άρα πρέπει ορισμένες εγγραφές να γίνουν αντιγραφή σε παραπάνω από ένα partition. Για να γίνει αυτό,
είναι ανάγκη να προσδιορίσουμε τα όρια κάθε partition ως προς το lat. Εντοπίζουμε, λοιπόν, το ανώτατο και κατώτατο όριο τιμών γεωγρ. πλάτους κάθε partition. 

In [15]:
w = Window.partitionBy('partition')
rdd5 = dataset1_partitioned.withColumn('part_maxlat', functions.max('lat').over(w))
rdd6 = rdd5.withColumn("part_minlat", functions.min('lat').over(w))
#rdd6.show()

Aποφασίζουμε να αντιγράψουμε τα δεδομένα των ξενοδοχείων με όρια τα +/- 0.1 μοίρες. Δηλαδή οι εγγραφές που απέχουν +/- 0.1 μοίρες από τα όρια που βρήκαμε παραπάνω, αντιγράφονται στο επόμενο ή στο προηγούμενο κελί αντίστοιχα.
Επίσης διαγράφουμε τις εγγραφές που αντιγράφηκαν σε partitions με τιμές -1 και 4 από τη στιγμή που ουσιαστικά δεν υφίστανται τέτοια partitions. 

In [16]:
dublicatesb = rdd6.filter(rdd6["dataset_id"] == "B")
duplicates_up = dublicatesb.filter("lat+0.1 > part_maxlat")
duplicates_down = dublicatesb.filter("lat-0.1 < part_minlat")
duplicates_upnew = duplicates_up.withColumn("partition", col("partition")+1)
duplicates_downnew = duplicates_down.withColumn("partition", col("partition")-1)
duplicates_all= duplicates_upnew.union(duplicates_downnew)
duplicates_teliko = duplicates_all.filter(duplicates_all["partition"] != -1)
duplicates_teliko1 = duplicates_teliko.filter(duplicates_teliko["partition"] != 4)
duplicates_teliko2 = duplicates_teliko1.orderBy(duplicates_teliko1["partition"].asc())

Ενώνουμε την τελική μορφή του αρχικού dataset (rdd6) με το dataframe που περιέχει τις αντιγραμμένες εγγραφές που κάναμε παραπάνω (duplicates_teliko2).

In [21]:
df_teliko = rdd6.union(duplicates_teliko2).distinct()
df_teliko.show()

+---+--------------------+---------+-----------+----------+----------+---------+-----------+-----------+
|_id|                name|      lat|        lon|dataset_id|increas_id|partition|part_maxlat|part_minlat|
+---+--------------------+---------+-----------+----------+----------+---------+-----------+-----------+
|358|       Coco's Bakery| 33.78827|  -117.9148|         A|       167|        0|   33.89901|  32.555576|
|369|       Coco's Bakery|33.866184|-118.352005|         A|       234|        0|   33.89901|  32.555576|
| 86|The Half Way Hous...| 34.46567|-118.405266|         A|       529|        2|   37.67892|  34.380478|
|381|Best Western King...|36.205086| -121.13951|         B|       619|        2|   37.67892|  34.380478|
|520|     Jack in the Box|36.517277| -119.56294|         A|       631|        2|   37.67892|  34.380478|
|270|      Baskin-Robbins| 37.37952|  -122.1172|         A|       728|        2|   37.67892|  34.380478|
|192|Ace Hotel and Swi...| 33.80138| -116.54003|       

Διαχωρισμός hotels-restaurants για join

In [18]:
df_teliko1 = df_teliko.cache()
hotel_df = df_teliko1.filter(df_teliko1["dataset_id"] == "B")
restaurant_df = df_teliko1.filter(df_teliko1["dataset_id"] == "A")
#hotel_df.count()          # gia na doume posa einai telika ta hotels (arxika + douplarismena) #
#restaurant_df.count()     # gia na doume posa einai telika ta estiatoria (dn exei allaksei o arithmos tous) #

In [19]:
hotel_dataframe2 = hotel_df.repartitionByRange(4, col("partition"))
restaurant_dataframe2 = restaurant_df.repartitionByRange(4, col("partition"))
#hotel_dataframe2.write.mode("overwrite").csv("dokimastika/results1.txt")
#restaurant_dataframe2.write.mode("overwrite").csv("dokimastika/results2.txt")

In [20]:
hotel_dataframe3 = hotel_dataframe2.selectExpr("_id as h_id", "name as hname","lat as hlat", "lon as hlon","partition as hpartition")
restaurant_dataframe3 = restaurant_dataframe2.selectExpr("_id as r_id", "name as rname","lat as rlat", "lon as rlon","partition as rpartition")

Join των δύο dataframes

In [22]:
join_dataframes = hotel_dataframe3.join(restaurant_dataframe3 ,[hotel_dataframe3['hpartition'] == restaurant_dataframe3['rpartition'],hotel_dataframe3['hlon'] < restaurant_dataframe3['rlon'] + 0.1, hotel_dataframe3['hlon'] > restaurant_dataframe3['rlon'] - 0.1 ],how = 'full')
#print(join_dataframes.show())
joined_cleaned = join_dataframes.dropna(how='any')
#joined_cleaned.count()     # Για να δούμε πόσες συζέυξεις θα κάνει τελικώς #

Yπολογισμός haversine distance και χρόνος εκτέλεσης του ερωτήματος

In [23]:
start_time = time.time()     
distances = joined_cleaned.rdd.map(lambda x:(x[0], x[1], x[5], x[6], haversine(( x[2] , x[3] ),( x[7] , x[8] ) )))   
print("--- %s seconds ---" % (time.time() - start_time)) # xronos ektelesis haversine (sto dokimastiko dataset (hotels_exmpl kai restuarants_exmpl) dn exei aksia na elegksoume ton xrono) #
distances_df = distances.toDF()

--- 5.838547945022583 seconds ---


Εμφάνιση αποτελεσμάτων για επιλογή απόστασης 500 μέτρα.

In [24]:
distances1 = distances_df.selectExpr("_1 as hotel_id", "_2 as hotel_name","_3 as restaurant_id", "_4 as restaurant_name", "_5 as distance")
results = distances1.filter(distances1[4] <= 0.5)  #(0.5 km)
results_Final = results.distinct()
results_Final.show()
results_Final.count()

+--------+--------------------+-------------+--------------------+--------------------+
|hotel_id|          hotel_name|restaurant_id|     restaurant_name|            distance|
+--------+--------------------+-------------+--------------------+--------------------+
|     321|Bishop Thunderbir...|          319|       Jack's Family| 0.07818610147120898|
|      10|           Ascot Inn|          314|Dorn's Original B...| 0.10099561698160375|
|     163|        Andrew Hotel|          477|     Jack in the Box|   0.204344022162059|
|     130|Alexander Inn and...|          477|     Jack in the Box|   0.222180051774984|
|      29|           Aloha Inn|          230|           Cj's Cafe|  0.3085182253087254|
|     108|   Alexis Park Hotel|          348|       Tommy's Joynt| 0.22621052947665762|
|     184|  Ansonia Abby Hotel|          112|Sears Fine Foods ...| 0.48890538575355497|
|     343|       Bel Air Hotel|          477|     Jack in the Box|  0.4124797681404847|
|     441|Bayside Inn At Th...| 

92