In [2]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from math import sqrt
from pyspark.sql.functions import lit

In [3]:
sc = SparkContext()
sqlContext = SQLContext(sc)

In [None]:
# Read three dataSet

In [4]:
station_rdd = sc.textFile("./result/station_result")
station_rdd = station_rdd.map(lambda line : line.split())

In [5]:
station_rdd.take(3)

[['Fri', 'Morning', '120', '40.68676', '-73.95929', '19', '0.02'],
 ['Fri', 'Morning', '143', '40.69239', '-73.99338', '24', '0.13'],
 ['Fri', 'Morning', '150', '40.72087', '-73.98086', '31', '0.05']]

In [6]:
station_df = station_rdd.toDF(["Day", "Time", "Id", "Latitude", "Longitude", "Total","Percent"])

In [7]:
station_df.show(3)

+---+-------+---+--------+---------+-----+-------+
|Day|   Time| Id|Latitude|Longitude|Total|Percent|
+---+-------+---+--------+---------+-----+-------+
|Fri|Morning|120|40.68676|-73.95929|   19|   0.02|
|Fri|Morning|143|40.69239|-73.99338|   24|   0.13|
|Fri|Morning|150|40.72087|-73.98086|   31|   0.05|
+---+-------+---+--------+---------+-----+-------+
only showing top 3 rows



In [8]:
traffic_rdd = sc.textFile("./result/traffic_result_20")
traffic_rdd = traffic_rdd.map(lambda line : line.split())

In [9]:
traffic_rdd.take(3)

[['40.60153', '-74.06137', 'Tue', 'Morning', '18.95'],
 ['40.60157', '-74.06018', 'Tue', 'Morning', '18.95'],
 ['40.60161', '-74.06218', 'Tue', 'Morning', '18.95']]

In [10]:
traffic_df = traffic_rdd.toDF(["Latitude", "Longitude", "Day", "Time", "Speed"])

In [11]:
traffic_df.show(3)

+--------+---------+---+-------+-----+
|Latitude|Longitude|Day|   Time|Speed|
+--------+---------+---+-------+-----+
|40.60153|-74.06137|Tue|Morning|18.95|
|40.60157|-74.06018|Tue|Morning|18.95|
|40.60161|-74.06218|Tue|Morning|18.95|
+--------+---------+---+-------+-----+
only showing top 3 rows



In [12]:
trip_rdd = sc.textFile("./result/trip_result")
trip_rdd = trip_rdd.map(lambda line : line.split())

In [13]:
trip_rdd.take(3)

[['116', 'Mon', 'Night', '-3'],
 ['116', 'Thu', 'Morning', '-1'],
 ['116', 'Tue', 'Night', '-2']]

In [14]:
trip_df = trip_rdd.toDF(["Id", "Day", "Time", "Rate"])

In [15]:
trip_df.show(3)

+---+---+-------+----+
| Id|Day|   Time|Rate|
+---+---+-------+----+
|116|Mon|  Night|  -3|
|116|Thu|Morning|  -1|
|116|Tue|  Night|  -2|
+---+---+-------+----+
only showing top 3 rows



In [9]:
# Inner join station_data and trip_data

In [16]:
bike_df = station_df.join(trip_df, on = ["Id", "Time", "Day"])

In [17]:
bike_df.show(3)

+----+-------+---+--------+---------+-----+-------+----+
|  Id|   Time|Day|Latitude|Longitude|Total|Percent|Rate|
+----+-------+---+--------+---------+-----+-------+----+
| 244|Morning|Wed|40.69196|-73.96537|   31|    0.1|  -3|
|3060|Morning|Thu|40.69425|-73.94627|   19|   0.12|  -1|
|3096|   Noon|Sat|40.71924|-73.95242|   39|   0.13|  -3|
+----+-------+---+--------+---------+-----+-------+----+
only showing top 3 rows



In [25]:
# Select Monday monrning to analyze

In [124]:
bike_curr = bike_df.filter((bike_df["Day"] == "Mon") & (bike_df["Time"] == "Morning"))

In [125]:
bike_curr.show(3)

+----+-------+---+--------+---------+-----+-------+----+
|  Id|   Time|Day|Latitude|Longitude|Total|Percent|Rate|
+----+-------+---+--------+---------+-----+-------+----+
|3354|Morning|Mon|40.66813|-73.97364|   31|   0.01|  -1|
| 349|Morning|Mon| 40.7185| -73.9833|   23|    0.0|  -4|
|3288|Morning|Mon| 40.7783|-73.94882|   39|   0.02|  -3|
+----+-------+---+--------+---------+-----+-------+----+
only showing top 3 rows



In [20]:
traffic_curr = traffic_df.filter((traffic_df["Day"] == "Mon") & (traffic_df["Time"] == "Morning"))

In [22]:
traffic_curr.show(3)

+--------+---------+---+-------+-----+
|Latitude|Longitude|Day|   Time|Speed|
+--------+---------+---+-------+-----+
|  40.609|-74.14958|Mon|Morning| 1.86|
|40.60933|-74.15027|Mon|Morning| 1.86|
|40.60989|-74.15113|Mon|Morning| 1.86|
+--------+---------+---+-------+-----+
only showing top 3 rows



In [39]:
# Combine three factors(percent, rate, distance)

In [None]:
# Get the distance score

In [35]:
def getScoreFromPoint (bike_curr, traffic_curr):
    lst = []
    bike_collect = bike_curr.collect()
    traffic_collect = traffic_curr.collect()
    for i in range(bike_curr.count()):
        bike_row = bike_collect[i]
        bike_latitude = float(bike_row["Latitude"])
        bike_longtitude = float(bike_row["Longitude"])
        min_dis = 1
        score = 0
        for j in range(traffic_curr.count()):
            traffic_row = traffic_collect[j]
            traffic_latitude = float(traffic_row["Latitude"])
            traffic_longitude = float(traffic_row["Longitude"])
            if ((abs(bike_latitude - traffic_latitude) > 0.01) | (abs(bike_longtitude - traffic_longitude) > 0.01)):
                continue
            dis = sqrt((bike_latitude - traffic_latitude) ** 2 + (bike_longtitude - traffic_longitude) ** 2)
            min_dis = min(dis, min_dis)
        if min_dis >= 0.002:
            score = 0.2
        else:
            score = (min_dis / 0.002) * 0.2
        lst.append(score)
    return lst

In [36]:
dist_score = getScoreFromPoint(bike_curr, traffic_curr)

In [None]:
# add another two factors and combine with distance score

In [56]:
def findLestScore(lst):
    min_rate = 0
    for i in range(len(lst)):
        curr_rate = float(lst[i]["Rate"])
        min_rate = min(min_rate, curr_rate)
    return min_rate

In [63]:
def combineThreeFactor(dist_score, bike_curr):
    score = []
    bike_collect = bike_curr.collect()
    min_rate = -1 * findLestScore(bike_collect) - 1
    for i in range(len(bike_collect)):
        bike_row = bike_collect[i]
        bike_percent = float(bike_row["Percent"])
        bike_rate = float(bike_row["Rate"])
        curr_score = ((0.2 - bike_percent) * 2) + (-bike_rate + 1) * (0.4 / min_rate) + dist_score[i]
        score.append(curr_score)
    return score

In [64]:
final_score = combineThreeFactor(dist_score, bike_curr)

In [66]:
# add final score to original table

In [162]:
id_col = bike_curr.select("Id").collect()

In [163]:
id_score = []
for i in range(len(final_score)):
    id_score.append([id_col[i]["Id"], final_score[i]])

In [165]:
id_col_rdd = sc.parallelize(id_score)

In [167]:
id_col_rdd.take(3)

[['3354', 0.618095238095238],
 ['349', 0.6952380952380952],
 ['3288', 0.6361904761904762]]

In [168]:
id_score_df = id_col_rdd.toDF(["Id", "Score"])

In [181]:
bike_curr_with_score = bike_curr.join(id_score_df, on = ["Id"])

In [None]:
# Sort on the Score and show top 10 score

In [182]:
bike_curr_with_score_sorted = bike_curr_with_score.sort("Score", ascending=False)

In [183]:
bike_curr_with_score_sorted.show(10)

+----+-------+---+--------+---------+-----+-------+----+------------------+
|  Id|   Time|Day|Latitude|Longitude|Total|Percent|Rate|             Score|
+----+-------+---+--------+---------+-----+-------+----+------------------+
| 511|Morning|Mon|40.72938|-73.97773|   33|   0.02| -22|0.9980952380952381|
| 445|Morning|Mon| 40.7274|-73.98143|   42|   0.03| -21|0.9590476190476191|
| 394|Morning|Mon|40.72521|-73.97769|   32|   0.01| -10|0.7895238095238095|
|3178|Morning|Mon|40.78414|-73.98363|   39|    0.0|  -5|0.7142857142857144|
| 473|Morning|Mon| 40.7211|-73.99193|   38|    0.0|  -5|0.7142857142857144|
| 393|Morning|Mon|40.72299|-73.97996|   31|    0.0|  -5|0.7142857142857144|
| 265|Morning|Mon|40.72229|-73.99148|   35|    0.0|  -4|0.6952380952380952|
| 349|Morning|Mon| 40.7185| -73.9833|   23|    0.0|  -4|0.6952380952380952|
| 356|Morning|Mon|40.71622|-73.98262|   23|    0.0|  -4|0.6952380952380952|
|3341|Morning|Mon|40.79534|-73.96186|   59|    0.0|  -4|0.6952380952380952|
+----+------

In [185]:
bike_curr_with_score_sorted.toPandas().to_csv("Analyze_Result")