In [6]:
import pyspark.sql.functions as F

from pyspark.mllib.regression import LabeledPoint
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession
from pyspark.sql.types import *

In [7]:
# Create a spark session/application
spark = SparkSession.builder.appName('Term_Statistics').getOrCreate()

In [8]:
# Import train_hire_stats.csv as dataframe using the defined schema
schema = StructType(
    [
        StructField("Zone_ID", ByteType(), False),
        StructField("Date", StringType(), False),
        StructField("Hour_slot", ByteType(), False),
        StructField("Hire_count", ShortType(), False)
    ]
)
train_df = spark.read.format("csv")\
        .option("header", "true")\
        .option("delimiter", ",")\
        .schema(schema)\
        .load("data/train_hire_stats.csv")

train_df = train_df.withColumn('Day_of_the_week', 
                               (F.date_format(train_df["Date"], "u").cast(IntegerType())))

train_df = train_df.withColumn('Month', 
                               (F.date_format(train_df["Date"], "M").cast(IntegerType())))

In [9]:
# Import test_hire_stats.csv as dataframe using the defined schema
schema = StructType(
    [
        StructField("Test_ID", ShortType(), False),
        StructField("Zone_ID", ByteType(), False),
        StructField("Date", StringType(), False),
        StructField("Hour_slot", ByteType(), False),
        StructField("Hire_count", ByteType(), False)
    ]
)
test_df = spark.read.format("csv")\
        .option("header", "true")\
        .option("delimiter", ",")\
        .schema(schema)\
        .load("data/test_hire_stats.csv")

test_df = test_df.withColumn('Day_of_the_week', 
                             (F.date_format(test_df["Date"], "u").cast(IntegerType())))

test_df = test_df.withColumn('Month', 
                             (F.date_format(test_df["Date"], "M").cast(IntegerType())))

In [41]:
compare_df = train_df.groupBy("Zone_ID", "Day_of_the_week", "Hour_slot").mean("Hire_count")
compare_df = compare_df.withColumn("avg(Hire_count)", compare_df["avg(Hire_count)"].cast(IntegerType()))

chinese_ny = train_df.where("Date == '2016-02-01'").groupBy("Zone_ID", "Hour_slot").mean("Hire_count").orderBy("Zone_ID", "Hour_slot")
chinese_ny = chinese_ny.withColumn("avg(Hire_count)", chinese_ny["avg(Hire_count)"].cast(IntegerType()))


In [43]:
final_df = test_df.join(compare_df, ["Zone_ID", "Day_of_the_week", "Hour_slot"], "fullouter")
final_df = final_df.withColumn("Hire_count", final_df["avg(Hire_count)"])
final_df = final_df.select("Test_ID", "Zone_ID", "Date", "Hour_slot", "Hire_count").filter("Test_ID is not null").orderBy(F.asc("Test_ID"))
final_df.show()

+-------+-------+----------+---------+----------+
|Test_ID|Zone_ID|      Date|Hour_slot|Hire_count|
+-------+-------+----------+---------+----------+
|      0|      7|2017-02-01|        0|         0|
|      1|      7|2017-02-01|        1|         0|
|      2|      7|2017-02-01|        2|         0|
|      3|      7|2017-02-01|        3|         0|
|      4|      7|2017-02-01|        4|         0|
|      5|      7|2017-02-01|        5|         0|
|      6|      7|2017-02-01|        6|         0|
|      7|      7|2017-02-01|        7|         0|
|      8|      7|2017-02-01|        8|         0|
|      9|      7|2017-02-01|        9|         0|
|     10|      7|2017-02-01|       10|         0|
|     11|      7|2017-02-01|       11|         0|
|     12|      7|2017-02-01|       12|         0|
|     13|      7|2017-02-01|       13|         0|
|     14|      7|2017-02-01|       14|         0|
|     15|      7|2017-02-01|       15|         0|
|     16|      7|2017-02-01|       16|         0|


In [23]:
# Write the modified dataframe to csv. 
## Spark write function will split the workload and save the output spread out over multiple parts
## Using cat and >  we will generate a single output file
final_df.write.mode("overwrite").csv('output/attempts/first-attempt')
os.system('rm output/attempts/first-attempt.csv')
os.system('cat output/attempts/first-attempt/p* > output/attempts/first-attempt.csv')

0

In [59]:
chinese_ny.select("Zone_ID", "Hour_slot", "avg(Hire_count)").where("Zone_ID == 14 AND Hour_slot <= 10").show()

+-------+---------+---------------+
|Zone_ID|Hour_slot|avg(Hire_count)|
+-------+---------+---------------+
|     14|        0|              0|
|     14|        1|              0|
|     14|        2|              0|
|     14|        3|              0|
|     14|        4|              0|
|     14|        5|              0|
|     14|        6|              0|
|     14|        7|              0|
|     14|        8|             12|
|     14|        9|              8|
|     14|       10|              3|
+-------+---------+---------------+

