<h1 style="color: #9966cc">BigData_A4<h1>

In [21]:
#!/home/ubuntu/ML/anaconda3/bin/pip install geopandas==0.8 rtree

In [1]:
import json
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point, Polygon, MultiPolygon
from geopandas.tools import sjoin

In [2]:
#!/home/ubuntu/ML/anaconda3/bin/pip install folium

In [3]:
import os
import sys

os.environ["SPARK_HOME"]="/home/ubuntu/BigData/spark"
os.environ["PYSPARK_PYTHON"]="/home/ubuntu/ML/anaconda3/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"]="/home/ubuntu/ML/anaconda3/bin/python"

spark_home = os.environ.get("SPARK_HOME")
sys.path.insert(0, os.path.join(spark_home, "python"))
sys.path.insert(0, os.path.join(spark_home, "python/lib/py4j-0.10.7-src.zip"))

In [4]:
import pyspark
from pyspark.sql import SparkSession, Row

In [5]:
conf = pyspark.SparkConf() \
        .set("spark.driver.memory", "2g") \
        .set("spark.executor.memory", "1g") \
        .set("spark.executor.core", "2") \
        .setAppName("bikeGraphApp") \
        .setMaster("local[4]") \
        .set("spark.driver.host", "127.0.0.1")  

In [6]:
spark = SparkSession\
    .builder\
    .config(conf=conf)\
    .getOrCreate()

In [7]:
import pyspark.sql.functions as F
from pyspark.sql.types import (
    StructType, 
    StructField, 
    StringType, 
    IntegerType, 
    DoubleType
)

In [8]:
sc = spark.sparkContext
sc

<h2 style="color: #3caa3c"> Task1 <h2>

In [9]:
BASE_PATH = "/home/ubuntu/BigData/datasets"
trips_data_path = "file://{}/201902-citibike-tripdata.csv".format(BASE_PATH)

In [10]:
df_trips = spark.read.load(trips_data_path, 
                           format="csv", 
                           header="true", 
                           inferSchema="true",
                           sep=",")

print("Total number of trips:", df_trips.count())
df_trips.show(5, truncate=True)

Total number of trips: 943744
+------------+--------------------+--------------------+----------------+--------------------+----------------------+-----------------------+--------------+--------------------+--------------------+---------------------+------+----------+----------+------+
|tripduration|           starttime|            stoptime|start station id|  start station name|start station latitude|start station longitude|end station id|    end station name|end station latitude|end station longitude|bikeid|  usertype|birth year|gender|
+------------+--------------------+--------------------+----------------+--------------------+----------------------+-----------------------+--------------+--------------------+--------------------+---------------------+------+----------+----------+------+
|         219|2019-02-01 00:00:...|2019-02-01 00:03:...|            3494|E 115 St & Lexing...|             40.797911|               -73.9423|          3501|E 118 St & Madiso...|          40.8014866| 

<h2> Количество начала и конца поездок для каждой станции <h2>

In [11]:
df_stations_start = df_trips.select(F.col("start station id").alias("id"), 
                              F.col("start station latitude").alias("lat"), 
                              F.col("start station longitude").alias("lng"))\
                        

df_stations_end = df_trips.select(F.col("end station id").alias("id"),  
                              F.col("end station latitude").alias("lat"), 
                              F.col("end station longitude").alias("lng"))\

df_stations_start.createOrReplaceTempView("trips_st")
df_stations_end.createOrReplaceTempView("trips_end")
df_count_start = spark.sql("SELECT id,lat,lng, count(*) as trips_number_start from trips_st group by id, lat,lng")
df_count_end = spark.sql("SELECT id, count(*) as trips_number_end from trips_end group by id")

df_stations=df_count_start.join(df_count_end,on="id")

df_stations.show(5)

+----+------------------+------------------+------------------+----------------+
|  id|               lat|               lng|trips_number_start|trips_number_end|
+----+------------------+------------------+------------------+----------------+
| 296|       40.71413089|       -73.9970468|              1795|            1825|
|3414|40.680944723477296|-73.97567331790923|               656|             681|
|3606|          40.74252|        -73.948852|               215|             224|
| 467|       40.68312489|      -73.97895137|              1276|            1312|
|3368|        40.6728155|      -73.98352355|               745|             769|
+----+------------------+------------------+------------------+----------------+
only showing top 5 rows



<h2> Сопоставление станций с кварталами. Подсчет количества начала и конца поездок для каждого квартала<h2>

In [12]:
borough_data_path = BASE_PATH + "/NYC Taxi Zones.geojson"

In [13]:
with open(borough_data_path) as f:
    zones_geojson = json.load(f)

In [14]:
column_name_list = [key for key, _ in zones_geojson["features"][0]["properties"].items()]
column_name_list += ["geometry"]
#column_name_list

In [15]:
def create_zone_rows(features):
    for item in features:
        row = list()
        for key, value in item["properties"].items():
            row.append(value)        
        polygons = list()
        for polygon in item["geometry"]["coordinates"]:
            polygons.append(Polygon(polygon[0]))
        row.append(MultiPolygon(polygons=polygons))
        yield row

In [54]:
df_zones_pn = pd.DataFrame(data=create_zone_rows(zones_geojson["features"]), 
                           columns=column_name_list)
#df_zones_pn.head(5)

Unnamed: 0,shape_area,objectid,shape_leng,location_id,zone,borough,geometry
0,0.0007823067885,1,0.116357453189,1,Newark Airport,EWR,(POLYGON ((-74.18445299999996 40.6949959999999...
1,0.00486634037837,2,0.43346966679,2,Jamaica Bay,Queens,(POLYGON ((-73.82337597260663 40.6389870471767...
2,0.000314414156821,3,0.0843411059012,3,Allerton/Pelham Gardens,Bronx,(POLYGON ((-73.84792614099985 40.8713422339999...
3,0.000111871946192,4,0.0435665270921,4,Alphabet City,Manhattan,(POLYGON ((-73.97177410965318 40.7258212813370...
4,0.000497957489363,5,0.0921464898574,5,Arden Heights,Staten Island,(POLYGON ((-74.17421738099989 40.5625680859999...


In [56]:
gdf_zones = gpd.GeoDataFrame(df_zones_pn, geometry=df_zones_pn["geometry"])
#gdf_zones.head(5)

Unnamed: 0,shape_area,objectid,shape_leng,location_id,zone,borough,geometry
0,0.0007823067885,1,0.116357453189,1,Newark Airport,EWR,"MULTIPOLYGON (((-74.18445 40.69500, -74.18449 ..."
1,0.00486634037837,2,0.43346966679,2,Jamaica Bay,Queens,"MULTIPOLYGON (((-73.82338 40.63899, -73.82277 ..."
2,0.000314414156821,3,0.0843411059012,3,Allerton/Pelham Gardens,Bronx,"MULTIPOLYGON (((-73.84793 40.87134, -73.84725 ..."
3,0.000111871946192,4,0.0435665270921,4,Alphabet City,Manhattan,"MULTIPOLYGON (((-73.97177 40.72582, -73.97179 ..."
4,0.000497957489363,5,0.0921464898574,5,Arden Heights,Staten Island,"MULTIPOLYGON (((-74.17422 40.56257, -74.17349 ..."


In [19]:
bc_zones = spark.sparkContext.broadcast(gdf_zones)
bc_zones

<pyspark.broadcast.Broadcast at 0x7f2c789d99e8>

In [20]:
def zone_contains_v2_df(rows):
    points = list()
    for row in rows: 
        points.append([row["lng"], row["lat"],row["trips_number_start"],row["trips_number_end"]])
    if len(points) == 0:
        return list()
    df_points_pn = pd.DataFrame(points, columns=["lng", "lat","trips_number_start","trips_number_end"])
    gdf_points = gpd.GeoDataFrame(df_points_pn, 
                                  geometry=gpd.points_from_xy(df_points_pn["lng"],
                                                              df_points_pn["lat"]
                                                              ))
    for index, item in sjoin(gdf_points, bc_zones.value, how="left").iterrows():
        yield Row(id=item["location_id"], zone=item["zone"], trips_number_start=item["trips_number_start"]\
                  ,trips_number_end=item["trips_number_end"] )

In [21]:
#df_stations.rdd.mapPartitions(zone_contains_v2_df).toDF().show(5)

In [22]:
df_zone_count = df_stations.rdd\
    .mapPartitions(zone_contains_v2_df).toDF()\
    .groupBy("id", "zone")\
    .agg(F.sum("trips_number_start").alias("All_start_trips"),F.sum("trips_number_end").alias("All_end_trips"))
df_zone_count.show(5)

+---+-------------+---------------+-------------+
| id|         zone|All_start_trips|All_end_trips|
+---+-------------+---------------+-------------+
|162| Midtown East|          13524|        13675|
| 52|  Cobble Hill|           2809|         2727|
| 97|  Fort Greene|          15506|        16413|
| 78| East Tremont|              1|            9|
|  4|Alphabet City|          10752|        10054|
+---+-------------+---------------+-------------+
only showing top 5 rows



<h2>Вывод по убыванию в зависимости от количества конца поездок<h2>

In [23]:
df_end_sort=df_zone_count.sort(F.desc("All_end_trips"))

df_end_sort.show(5)

+---+--------------------+---------------+-------------+
| id|                zone|All_start_trips|All_end_trips|
+---+--------------------+---------------+-------------+
| 79|        East Village|          43333|        43170|
| 68|        East Chelsea|          38967|        39395|
|170|         Murray Hill|          32572|        32578|
|234|            Union Sq|          30527|        31464|
|113|Greenwich Village...|          26485|        27054|
+---+--------------------+---------------+-------------+
only showing top 5 rows



<h2>Вывод по убыванию в зависимости от количества начала поездок<h2>

In [24]:
df_st_sort=df_zone_count.sort(F.desc("All_start_trips"))

df_st_sort.show(5)

+---+--------------------+---------------+-------------+
| id|                zone|All_start_trips|All_end_trips|
+---+--------------------+---------------+-------------+
| 79|        East Village|          43333|        43170|
| 68|        East Chelsea|          38967|        39395|
|170|         Murray Hill|          32572|        32578|
|234|            Union Sq|          30527|        31464|
|113|Greenwich Village...|          26485|        27054|
+---+--------------------+---------------+-------------+
only showing top 5 rows



<h2>Отображение в виде картограмм<h2>

In [25]:
import folium
from folium.plugins import HeatMap, HeatMapWithTime

In [26]:
# https://github.com/python-visualization/folium/issues/812
def embed_map(m):
    from IPython.display import IFrame
    m.save('index.html')
    return IFrame('index.html', width='100%', height='750px')

In [27]:
m = folium.Map()
m1 = folium.Map()

In [28]:
df_count_pn=df_zone_count.toPandas()

<h2>Карта распределения по количеству начала поездок<h2>

In [51]:
folium.Choropleth(
    geo_data=zones_geojson,
    data=df_count_pn,
    columns=["id", "All_start_trips"],
    name="Number of start trips",
    legend_name="Number of start trips",
    key_on="feature.properties.location_id",
    highlight=True,
    nan_fill_color="grey",
    nan_fill_opacity=0.1,
    fill_color="Purples",
    fill_opacity=0.7,
    line_opacity=0.2,
).add_to(m)
m.fit_bounds(m.get_bounds())
embed_map(m)

<h2>Карта распределения по количеству конца поездок<h2>

In [52]:
folium.Choropleth(
    geo_data=zones_geojson,
    data=df_count_pn,
    columns=["id", "All_end_trips"],
    name="Number of end trips",
    legend_name="Number of end trips",
    key_on="feature.properties.location_id",
    highlight=True,
    nan_fill_color="grey",
    nan_fill_opacity=0.1,
    fill_color="Purples",
    fill_opacity=0.7,
    line_opacity=0.2,
).add_to(m1)
m1.fit_bounds(m1.get_bounds())
embed_map(m1)

<h2 style="color: #3caa3c"> Task 3 <h2>

<h2> Вычисление расстояния <h2>

In [42]:
df_trips_dist = df_trips.select(F.col("start station id").alias("id_start"), 
                              F.col("end station id").alias("id_end"), 
                              F.col("start station longitude").alias("lng_st"),
                              F.col("start station latitude").alias("lat_st"),
                              F.col("end station longitude").alias("lng_end"),
                              F.col("end station latitude").alias("lat_end"))

df_trips_dist.show(5)

+--------+------+-----------------+-----------------+------------------+-----------------+
|id_start|id_end|           lng_st|           lat_st|           lng_end|          lat_end|
+--------+------+-----------------+-----------------+------------------+-----------------+
|    3494|  3501|         -73.9423|        40.797911|       -73.9442507|       40.8014866|
|     438|   236|     -73.98564945|      40.72779126|      -73.98713956|       40.7284186|
|    3571|  3549|       -73.952918|        40.676368|        -73.962408|        40.678045|
|     167|   477|     -73.97604882|       40.7489006|       -73.9900262|      40.75640548|
|    3458|  3443|-73.9783501625061|40.76309387270797|-73.97982001304626|40.76132983124814|
+--------+------+-----------------+-----------------+------------------+-----------------+
only showing top 5 rows



In [43]:
from math import cos, asin, sqrt

In [44]:
def distance(rows):#вычисление расстояния
    for row in rows:
        if row["lat_st"]!=row["lat_end"] and row["lng_st"]!=row["lng_end"]:
            lat0=row["lat_st"]
            lat1=row["lat_end"]
            lng0=row["lng_st"]
            lng1=row["lng_end"]
            lat_0=lat0-lat1
            lng_0 = lng0-lng1
            p = 0.017453292519943295
            a = 0.5 - cos((lat_0) * p)/2 + cos(lat0 * p) * cos(lat1 * p) * (1 - cos((lng_0) * p)) / 2
            c = 2 * asin(sqrt(a)) 
            r = 6371*1000*c
            yield Row(m_dist=float(r),id_start=row["id_start"],id_end=row["id_end"])

In [45]:
df_dist = df_trips_dist.rdd\
    .mapPartitions(distance).toDF()
df_dist.show(5)

+------+--------+------------------+
|id_end|id_start|            m_dist|
+------+--------+------------------+
|  3501|    3494|430.16041257402225|
|   236|     438|143.63994354824555|
|  3549|    3571| 821.7251364836256|
|   477|     167|1443.1216859041301|
|  3443|    3458|231.94966562338428|
+------+--------+------------------+
only showing top 5 rows



<h2>Расчет максимального, среднего значения, стандартного отклонения и медианы<h2>

In [48]:
df_dist.describe("m_dist").show(5)

+-------+------------------+
|summary|            m_dist|
+-------+------------------+
|  count|            928612|
|   mean|1640.3742164463074|
| stddev| 1288.675635502973|
|    min|36.735871426483946|
|    max|15326.431486590483|
+-------+------------------+



In [50]:
median = df_dist.approxQuantile("m_dist", [0.5], 0)[0]
median

1262.874571344554

<h2 style="color: #3caa3c">Task 4<h2>

In [36]:
df_med_st_trips = df_trips.select(F.col("start station id").alias("id_start"), 
                              F.col("starttime").alias("st_time")
                              )

df_med_end_trips = df_trips.select(F.col("end station id").alias("id_end"), 
                              F.col("stoptime").alias("end_time")
                              )

#df_med_st_trips.show(5)
#df_med_end_trips.show(5)

In [37]:
df_med_st_trips.createOrReplaceTempView("trips_date_st")
df_date_st=spark.sql("SELECT id_start, DATE_FORMAT(st_time, 'dd/MM/yyyy') as day from trips_date_st")
#df_date_st.show(5)

df_med_end_trips.createOrReplaceTempView("trips_date_end")
df_date_end=spark.sql("SELECT id_end, DATE_FORMAT(end_time, 'dd/MM/yyyy') as day from trips_date_end")
#df_date_end.show(5)

In [38]:
df_date_st.createOrReplaceTempView("trips_date_st")
df_day_num_st=spark.sql("SELECT id_start,count(distinct day) as num_day from trips_date_st group by id_start")
#df_day_num_st.show(5)

df_date_end.createOrReplaceTempView("trips_date_end")
df_day_num_end=spark.sql("SELECT id_end,count(distinct day) as num_day from trips_date_end group by id_end")
#df_day_num_end.show(5)

In [41]:
df_day_num_st.createOrReplaceTempView("day_num_st")#таблица с общим количеством дней старта
df_day_num_end.createOrReplaceTempView("day_num_end")#таблица с общим количеством дней конца

df_count_start.createOrReplaceTempView("count_trips_st")#таблица с общим количеством поездок старта
df_count_end.createOrReplaceTempView("count_trips_end")#таблица с общим количеством поездок конца

df_med_st=spark.sql("SELECT ROUND(tr.trips_number_start/day.num_day) as med_trips_st,day.id_start as id from count_trips_st tr, day_num_st day where tr.id=day.id_start")

df_med_end=spark.sql("SELECT ROUND(tr.trips_number_end/day.num_day) as med_trips_end,day.id_end as id from count_trips_end tr, day_num_end day where tr.id=day.id_end")

df_trips_med=df_med_st.join(df_med_end,on="id")
df_trips_med.show(5)

+----+------------+-------------+
|  id|med_trips_st|med_trips_end|
+----+------------+-------------+
| 296|        64.0|         63.0|
|3414|        23.0|         24.0|
|3606|         8.0|          8.0|
| 467|        46.0|         45.0|
|3368|        27.0|         27.0|
+----+------------+-------------+
only showing top 5 rows



In [57]:
#sc.stop