In [4]:
from pyspark.sql import SparkSession
from datetime import datetime
import math
spark = SparkSession.builder.getOrCreate()

# RDD method

## Input Data

In [1]:
registerRDD_raw = sc.textFile("/data/students/bigdata_internet/lab3/register.csv").map(lambda row:row.split("\t"))
# print(registerRDD_raw.take(3))
print("Total line of the raw register RDD = ",registerRDD_raw.count())
def regFilter(e):
    if(e[0]=="station"):
        return False
    if(e[2]=='0' and e[3]=='0'):
        return False
    return True
registerRDD = registerRDD_raw.filter(regFilter)
# print(registerRDD.take(3))
print("Lines of the filted register RDD = ", registerRDD.count())

stationsRDD= sc.textFile("/data/students/bigdata_internet/lab3/stations.csv").map(lambda row:row.split("\t")).filter(lambda row:False if row[0]=="id" else True)
# stationsRDD.take(3)

Total line of the raw register RDD =  25319029
Lines of the filted register RDD =  25104121


In [2]:
# transfrom stationsRDD to pair RDD
stationsPairRDD = stationsRDD.map(lambda e:(e[0],[e[1],e[2],e[3]]))
stationsPairRDD.take(3)

[('1', ['2.180019', '41.397978', 'Gran Via Corts Catalanes']),
 ('2', ['2.176414', '41.394381', 'Plaza TetuÃ¡n']),
 ('3', ['2.181164', '41.393750', 'Ali Bei'])]

## Exercise 

In [5]:
# calcultate the criticality
def registerRemap(row):
    key = str(row[0])+"-"+datetime.strptime(row[1],"%Y-%m-%d %H:%M:%S").strftime("%w-%H")
    val=[0,1] #val[0] is the flag for cretical reading, val[1] is a counter for reading
    if row[3]=='0':
        val[0] = 1 #the station is critical for that reading
    return (key,val)

STPairedRegisterRDD = registerRDD.map(registerRemap)
creticality=STPairedRegisterRDD.reduceByKey(lambda e1,e2:[e1[0]+e2[0],e1[1]+e2[1]]).mapValues(lambda v :format(v[0]/v[1],'.3f'))
creticality.take(3)

[('1-5-05', '0.162'), ('1-5-08', '0.015'), ('1-5-13', '0.023')]

In [6]:
# filter the pairs greater than the threshold
threshold = 0.6
def thresholdFilter(tup):
    (key,val)=tup
    if float(val)>=threshold:
        return True
    return False
creticalityFilted = creticality.filter(thresholdFilter)
creticalityFilted.take(3)

[('58-1-01', '0.624'), ('9-5-10', '0.613'), ('9-5-22', '0.626')]

In [7]:
# order the result by increasing criticality
def swapKeyVal(tup):
    (key,val)= tup
    return (val,key)
creticalityFiltedSorted = creticalityFilted.map(swapKeyVal).sortByKey().map(swapKeyVal)
creticalityFiltedSorted.take(3)

[('9-5-10', '0.613'), ('10-6-00', '0.622'), ('58-1-01', '0.624')]

In [8]:
# join the criticality with the station inforamtion and store
def remapCreticality(tup):
    (key,val)=tup
    keyList = key.split("-")
    day = "Sunday"
    if keyList[1]=='1':
        day="Monday"
    elif keyList[1]=='2':
        day="Tuesday"
    elif keyList[1]=='3':
        day="Wednesday"
    elif keyList[1]=='4':
        day ="Thursday"
    elif keyList[1]=='5':
        day ="Friday"
    elif keyList[1]=='6':
        day = "Saturday"
    valList = []
    valList.append(day)
    valList.append(keyList[2])
    valList.append(val)
    return (keyList[0],valList)
creticalityFiltedSortedJoined=creticalityFiltedSorted.map(remapCreticality).join(stationsPairRDD)

def remapToResult(tup):
    (key,(v1,v2))=tup
    result = []
    result.append(key)
    result.append(v2[0])
    result.append(v2[1])
    result.append(v1[0])
    result.append(v1[1])
    result.append(v1[2])
    return result
ResultRDD = creticalityFiltedSortedJoined.map(remapToResult)
print("The number of obtained critical pairs is: ",ResultRDD.count())
print(ResultRDD.collect())

The number of obtained critical pairs is:  5
[['10', '2.185206', '41.384875', 'Saturday', '00', '0.622'], ['9', '2.185294', '41.385006', 'Friday', '10', '0.613'], ['9', '2.185294', '41.385006', 'Friday', '22', '0.626'], ['58', '2.170736', '41.377536', 'Monday', '01', '0.624'], ['58', '2.170736', '41.377536', 'Monday', '00', '0.632']]


In [11]:
df = spark.createDataFrame(ResultRDD,["station","station longitude","station latitude",\
                                     "day of week","hour","criticality value"])
df.write.csv("./lab3/RDD_result.csv",header=True,sep="\t")

# Dataframe Method

## Input Data

In [13]:
df_register_raw = spark.read.load("/data/students/bigdata_internet/lab3/register.csv",
                             format="csv",
                             header=True,
                             inferSchema=True,
                             sep="\t")
# df_register_raw.show(3)
print("The total number of data is=",df_register_raw.count())
df_stations = spark.read.load("/data/students/bigdata_internet/lab3/stations.csv",
                             format="csv",
                             header=True,
                             inferSchema=True,
                             sep="\t")
# df_stations.show(3)

# fitering the uncorrect data
df_register = df_register_raw.filter("used_slots!=0 or free_slots!=0 ")
print("Number of data after filtering=",df_register.count())

The total number of data is= 25319028
Number of data after filtering= 25104121


## Exercise

In [14]:
spark.udf.register("critical",lambda x:1 if x==0 else 0)
spark.udf.register("calCriticality",lambda a,b:format(a/b,'.3f'))
# calculate the critiality
dfRemap = df_register.selectExpr("station","date_format(timestamp,'EEEE') AS day", "date_format(timestamp,'H') As hour","critical(free_slots) AS critical" )
dfRemap.show(3)
dfRemap.createOrReplaceTempView("register")

dfSumCount = spark.sql("SELECT station,day,hour,sum(critical) AS numCritial,count(critical) AS totalNum FROM register GROUP BY station,day,hour")
dfCriticality = dfSumCount.selectExpr("station","day","hour","calCriticality(numCritial,totalNum) AS criticality")
dfCriticality.show(3)

+-------+--------+----+--------+
|station|     day|hour|critical|
+-------+--------+----+--------+
|      1|Thursday|  12|       0|
|      1|Thursday|  12|       0|
|      1|Thursday|  12|       0|
+-------+--------+----+--------+
only showing top 3 rows

+-------+---------+----+-----------+
|station|      day|hour|criticality|
+-------+---------+----+-----------+
|      1|Wednesday|  19|      0.004|
|      3|   Friday|   4|      0.230|
|      6| Saturday|  22|      0.134|
+-------+---------+----+-----------+
only showing top 3 rows



In [16]:
# filter the data greater than the threshold
threshold=0.6
condition = "criticality>="+str(threshold)
dfSelected=dfCriticality.filter(condition)
dfSelected.show()

+-------+--------+----+-----------+
|station|     day|hour|criticality|
+-------+--------+----+-----------+
|     58|  Monday|   0|      0.632|
|      9|  Friday|  10|      0.613|
|     10|Saturday|   0|      0.622|
|     58|  Monday|   1|      0.624|
|      9|  Friday|  22|      0.626|
+-------+--------+----+-----------+



In [17]:
# sorting
dfSorted = dfSelected.sort("criticality")
dfSorted.show()

+-------+--------+----+-----------+
|station|     day|hour|criticality|
+-------+--------+----+-----------+
|      9|  Friday|  10|      0.613|
|     10|Saturday|   0|      0.622|
|     58|  Monday|   1|      0.624|
|      9|  Friday|  22|      0.626|
|     58|  Monday|   0|      0.632|
+-------+--------+----+-----------+



In [20]:
# combine two df
df_result = dfSorted.join(df_stations,dfSorted.station==df_stations.id).select("station","latitude","longitude","day","hour","criticality")
print("The number of obtained critical pairs is: ",df_result.count())
df_result.show()

The number of obtained critical pairs is:  5
+-------+---------+---------+--------+----+-----------+
|station| latitude|longitude|     day|hour|criticality|
+-------+---------+---------+--------+----+-----------+
|      9|41.385006| 2.185294|  Friday|  10|      0.613|
|     10|41.384875| 2.185206|Saturday|   0|      0.622|
|     58|41.377536| 2.170736|  Monday|   1|      0.624|
|      9|41.385006| 2.185294|  Friday|  22|      0.626|
|     58|41.377536| 2.170736|  Monday|   0|      0.632|
+-------+---------+---------+--------+----+-----------+



In [21]:
df_result.write.csv("./lab3/Dataframe_result.csv",header=True,sep="\t")

# BONUS TASK

In [22]:
#station distance calculation
def distanceToCenter(lat2,long2):
    toRad = 0.01745329252
    lat1 = 41.386904*toRad
    long1 =2.169989*toRad
    lat2=lat2*toRad
    long2=long2*toRad
    r = 6356.725 # km
    temp1 =math.sin((lat2-lat1)/2)*math.sin((lat2-lat1)/2)
    temp2 = math.cos(lat1)*math.cos(lat2)*math.sin((long2-long1)/2)*math.sin((long2-long1)/2)
    return format(2*r*math.asin(math.sqrt(temp1+temp2)),'.3f') 
spark.udf.register("distanceCal",lambda lat,long:distanceToCenter(lat,long))
dfStationDist = df_stations.selectExpr("id","name","distanceCal(latitude,longitude) AS distance")
dfStationDist.show(3)

+---+--------------------+--------+
| id|                name|distance|
+---+--------------------+--------+
|  1|Gran Via Corts Ca...|   1.485|
|  2|       Plaza TetuÃ¡n|   0.987|
|  3|             Ali Bei|   1.201|
+---+--------------------+--------+
only showing top 3 rows



In [23]:
# Average among reading of used slots
df_register.createOrReplaceTempView("register2")
dfStationAvg = spark.sql("SELECT station, avg(used_slots) AS avg_used FROM register2 GROUP BY station")
dfStationAvg.show(3)

+-------+-----------------+
|station|         avg_used|
+-------+-----------------+
|    148| 9.81105348279516|
|    243|9.272743674168959|
|     31|5.277462216695235|
+-------+-----------------+
only showing top 3 rows



In [25]:
dfStationNear = dfStationDist.filter("distance<1.5").join(dfStationAvg,dfStationAvg.station==dfStationDist.id)
dfStationNear.show(3)
U1=dfStationNear.selectExpr("avg(avg_used) AS U1" )
U1.show()

+---+----------------+--------+-------+-----------------+
| id|            name|distance|station|         avg_used|
+---+----------------+--------+-------+-----------------+
| 65|Rambla Catalunya|   0.554|     65| 7.31260474158157|
|126|          Girona|   1.179|    126|4.796550416136734|
| 81|        Casanova|   1.464|     81|6.274346461792722|
+---+----------------+--------+-------+-----------------+
only showing top 3 rows

+-----------------+
|               U1|
+-----------------+
|8.103784376830165|
+-----------------+



In [24]:
dfStationFar = dfStationDist.filter("distance>=1.5").join(dfStationAvg,dfStationAvg.station==dfStationDist.id)
dfStationFar.show(3)
U2=dfStationFar.selectExpr("avg(avg_used) AS U2" )
U2.show()

+---+-----------+--------+-------+-----------------+
| id|       name|distance|station|         avg_used|
+---+-----------+--------+-------+-----------------+
|148|Rambla Prim|   4.801|    148| 9.81105348279516|
|243|  PalÃ¨ncia|   3.866|    243|9.272743674168959|
| 31|  ProvenÃ§a|   1.570|     31|5.277462216695235|
+---+-----------+--------+-------+-----------------+
only showing top 3 rows

+-----------------+
|               U2|
+-----------------+
|7.889224284652577|
+-----------------+



Answer: There are more used the station closer to center. U1=8.10  U2=7.89