In [27]:
import numpy as np
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
from pyspark.mllib.stat import Statistics
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pyarrow as pa
import pyarrow.parquet as pq

In [3]:
spark = SparkSession \
    .builder \
    .getOrCreate()

In [10]:
weather = spark.read.parquet('../data/weather-data.parquet')
duration = spark.read.parquet('../data/duration-data.parquet')

In [11]:
weather.count()

986

In [12]:
duration.count()

8343849

In [13]:
weather = weather.withColumnRenamed("DATE", "trip_date")
weather.show(5)

+----------+-----+----+----+----+----+----+----+
| trip_date| AWND|PRCP|SNOW|SNWD|TAVG|TMAX|TMIN|
+----------+-----+----+----+----+----+----+----+
|2018-01-01|11.63| 0.0| 0.0| 0.0|19.0|  26|  13|
|2018-01-02| 7.83| 0.0| 0.0| 0.0|18.0|  26|  13|
|2018-01-03|  3.8| 0.0| 0.0| 0.0|23.0|  35|  13|
|2018-01-04|19.91| 0.1| 0.8| 1.2|26.0|  27|  16|
|2018-01-05|15.66| 0.0| 0.0| 1.2|17.0|  21|  13|
+----------+-----+----+----+----+----+----+----+
only showing top 5 rows



In [14]:
duration.show(5)

+-------------------+-------------------+----------------+-----------------+
|         trip_start|           trip_end|duration_seconds|__index_level_0__|
+-------------------+-------------------+----------------+-----------------+
|2017-12-31 19:05:06|2017-12-31 19:14:18|           552.0|                0|
|2017-12-31 19:14:30|2017-12-31 19:35:53|          1283.0|                1|
|2017-12-31 19:14:53|2017-12-31 19:35:58|          1265.0|                2|
|2017-12-31 19:15:31|2017-12-31 19:25:09|           578.0|                3|
|2017-12-31 19:18:02|2017-12-31 19:24:15|           373.0|                4|
+-------------------+-------------------+----------------+-----------------+
only showing top 5 rows



In [15]:
duration = duration.withColumn("trip_date", (duration['trip_start'].cast("date")))

In [16]:
duration.show(5)

+-------------------+-------------------+----------------+-----------------+----------+
|         trip_start|           trip_end|duration_seconds|__index_level_0__| trip_date|
+-------------------+-------------------+----------------+-----------------+----------+
|2017-12-31 19:05:06|2017-12-31 19:14:18|           552.0|                0|2017-12-31|
|2017-12-31 19:14:30|2017-12-31 19:35:53|          1283.0|                1|2017-12-31|
|2017-12-31 19:14:53|2017-12-31 19:35:58|          1265.0|                2|2017-12-31|
|2017-12-31 19:15:31|2017-12-31 19:25:09|           578.0|                3|2017-12-31|
|2017-12-31 19:18:02|2017-12-31 19:24:15|           373.0|                4|2017-12-31|
+-------------------+-------------------+----------------+-----------------+----------+
only showing top 5 rows



In [17]:
duration.createOrReplaceTempView('dv')
trips = spark.sql("""
                  SELECT trip_date, count(*) as trip_count, sum(duration_seconds) as total_seconds
                  FROM dv
                  GROUP BY trip_date
                  ORDER BY trip_date
                  """)
trips.show(10)

+----------+----------+-------------+
| trip_date|trip_count|total_seconds|
+----------+----------+-------------+
|2017-12-31|       141|     146743.0|
|2018-01-01|      1099|    1057878.0|
|2018-01-02|      3796|    2840648.0|
|2018-01-03|      4742|    3342920.0|
|2018-01-04|      2359|    1373034.0|
|2018-01-05|      2735|    1643926.0|
|2018-01-06|      1510|     965617.0|
|2018-01-07|      1719|    1153907.0|
|2018-01-08|      4099|    2725381.0|
|2018-01-09|      6119|    4649579.0|
+----------+----------+-------------+
only showing top 10 rows



In [22]:
dat = trips.join(weather, on = ['trip_date'], how = 'inner').orderBy(col('trip_date'))
dat.show(10)

+----------+----------+-------------+-----+----+----+----+----+----+----+
| trip_date|trip_count|total_seconds| AWND|PRCP|SNOW|SNWD|TAVG|TMAX|TMIN|
+----------+----------+-------------+-----+----+----+----+----+----+----+
|2018-01-01|      1099|    1057878.0|11.63| 0.0| 0.0| 0.0|19.0|  26|  13|
|2018-01-02|      3796|    2840648.0| 7.83| 0.0| 0.0| 0.0|18.0|  26|  13|
|2018-01-03|      4742|    3342920.0|  3.8| 0.0| 0.0| 0.0|23.0|  35|  13|
|2018-01-04|      2359|    1373034.0|19.91| 0.1| 0.8| 1.2|26.0|  27|  16|
|2018-01-05|      2735|    1643926.0|15.66| 0.0| 0.0| 1.2|17.0|  21|  13|
|2018-01-06|      1510|     965617.0|13.42| 0.0| 0.0| 0.0|15.0|  21|  11|
|2018-01-07|      1719|    1153907.0| 8.95| 0.0| 0.0| 0.0|15.0|  23|   8|
|2018-01-08|      4099|    2725381.0|  8.5|0.06| 0.0| 0.0|25.0|  39|  19|
|2018-01-09|      6119|    4649579.0| 5.37| 0.0| 0.0| 0.0|38.0|  51|  30|
|2018-01-10|      7109|    5293373.0| 3.36| 0.0| 0.0| 0.0|36.0|  43|  27|
+----------+----------+-------------+-

In [30]:
result_dat = dat.select('*').toPandas()
result_table = pa.Table.from_pandas(result_dat, preserve_index=False)
pq.write_table(result_table, '../data/bikeshare-data.parquet')