# Setup

Installed the pip installer and anaconda2 with Python 2.7 (for PySpark compatibility reasons). Added the NumPy and Python Kafka module onto the existing version of Python through the pip installer. Connected Anaconda and Jupyter to the PySpark environment setting certain paths and variables: 

export SPARK_HOME=/usr/lib/spark |
export PATH=$SPARK_HOME/bin:$PATH |
export PYSPARK_DRIVER_PYTHON=jupyter |
export PYSPARK_DRIVER_PYTHON_OPTS='notebook' |
export PYSPARK_PYTHON=/home/cloudera/anaconda3/envs/python2/bin/python2.7

Subsequently Jupyter is launchable through PySpark.

# Simulated Data Pipeline: Data Preparation

Since the used data does not originate from a real-time stream, past data files have been downloaded from http://www.nyc.gov/html/tlc/html/about/trip_record_data.shtml. The obtained CSV-files need to be places on the Desktop in a folder called 'Data'. Once this step completed there is no further directory structure to be added.

The following script pre-slices the files into smaller CSV-files, which each cover about 5 minutes of data inside the files. Since the yellow cab taxi service has a higher ride frequency, it needs to be adjusted for in the slicing of records to achieve a similar pick up time frame per slice. The slicing of a green Taxi data set took around 1 minute compared to a yellow Taxi data set during testing. Later on, the newly generated CSV-data chunks will be picked up again and used for Kafka messaging.

This script only needs to be exeuted once and can be re-used after new data sets appear every month.

In [3]:
import pandas as pd
import subprocess as sp
from os import listdir, path, makedirs
from os.path import isfile, join


class FileSlicer:
    """
    This class is responsible for slicing the CSV-files containing the taxi ride records
    into smaller elements, containing around 5 minutes of data.
    """
    def __init__(self):
        """
        Setup of the class structure and execution of the essential class methods, to split the
        data sheets into smaller chunks.
        """
        self.filesList = []
        self.get_files()
        self.split_data()

    def get_files(self):
        """
        Creation of a list of all files directory location contained in the Desktop's 'Data'-folder
        for iteration purposes.
        """
        filesList = [f for f in listdir('/home/cloudera/Desktop/Data')
                     if isfile(join('/home/cloudera/Desktop/Data', f)) and '.csv' in f]

        for i in filesList:
            splitString = i.split('_')
            splitDate = splitString[2].split('-')
            self.filesList.append(['/home/cloudera/Desktop/Data/' + i,
                                   splitString[0], splitDate[0], splitDate[1][:2]])

    def split_data(self):
        for i in self.filesList:
            size = 0
            """
            Adjustment for the different data sets. Preparation of the iteration for the slicing
            of the data.
            """
            if i[1]  == "yellow":
                size = 1000
            elif i[1] == "green":
                size = 100
            rows = pd.read_csv(i[0], header = 'infer', chunksize = size)
            self.slice_output(rows, i[1], i[2], i[3])

    def slice_output(self, data, color, year, month):
        """
        Actual slicing of the files into smaller chunks and placement of just those into a new folder
        'Chunks' on the Desktop, with each of the chunks which once belonged to the same file being
        in the same subfolder named after the originating file.
        """
        counter = 0
        newpath = '/home/cloudera/Desktop/Chunks/' \
                  + color + "_" + year + "_" + month
        if not path.exists(newpath):
            makedirs(newpath)
        for chunk in data:
            chunk.to_csv(newpath + '/_' + str(counter) + ".csv", sep = '\t')
            counter += 1


if __name__ == "__main__":
    slicer = FileSlicer()

# Simulated Data Pipeline: Data Slices to KafkaProducer

In the following script the generated data chunks will be picked up from the originally created Desktop's directory 'Chunks'. The operation of picking up a data chunk and sending it through a Kafka Producer is executed in batches every five minutes to simulate a real-time data stream and correspond to the time frame contained in each data chunk.

In [None]:
from kafka import SimpleProducer, KafkaClient
from os import listdir, path, makedirs
from pyspark.sql import SQLContext
from os.path import isfile, join
from datetime import datetime
import Tkinter, tkFileDialog
import pandas as pd
import sched, time
import json
import sys
import os


class Streamer:
    """
    Execution of a script every five minutes, as this approximately the time frame
    of data every data chunk contains.
    """
    def __init__(self):
       """
       Setting-up the instance variables and calling on the initiation method.
       """
       self.filesList = []
       self.chunk_counter = 0
       self.kafka = KafkaClient("localhost:9092")
       self.producer = SimpleProducer(self.kafka)
       self.setup()
    
    def setup(self):
        """
        Selection of the corresponding original file, which is supposed to be streamed
        and retraction, sorting and location of all data chunks contained in the selected
        folder.
        """
        root = Tkinter.Tk()
        root.withdraw()
        self.file_path = tkFileDialog.askdirectory()
        
        filesList = [f for f in listdir(self.file_path)
                     if isfile(join(self.file_path, f)) and '.csv' in f]
        
        numberList = []
        
        for i in filesList:
            numberList.append(i[i.find("_")+1:i.find(".")])
        
        numberList.sort(key=int)
        
        for i in numberList:
            self.filesList.append(self.file_path+"/_"+i+".csv")
            
        if "yellow" in str(self.file_path):
            self.chunk_size = 1000
        else:
            self.chunk_size = 100
            
        filesList, numberList = [], []
        
    def kafkaStream(self):
        """
        Reading of the next data chunk .csv-file and conversion into a Pandas-dataframe.
        Extraction of the column headers and all column data, which are stored into a
        respective data structures. Zipping of the header with each data line into a 
        dictionary in order to easily convert it to a .json-file. The generated file
        is streamed through the Kafka Producer which is established subsequently.
        """
        if self.chunk_counter >= len(self.filesList):
            tkMessageBox.showinfo("Streaming completed", "The streaming of the file (%s)" +
                         "has been completed." % self.file_path)
            sys.exit()
        
        rawFile = pd.read_csv(self.filesList[self.chunk_counter], 
                              header = 'infer', chunksize = 1)
        
        dataframe = rawFile.get_chunk(self.chunk_size)

        headerPrep = str(list(dataframe.axes)[1])
        header = headerPrep[headerPrep.find("[")+4:headerPrep.find("]")-1].split('\\')

        data = dataframe.values
        datalist = []

        for i in data:
            raw = str(i)[str(i).find("[")+3:str(i).find("]")-1]
            datalist.append(raw.split('\\'))

        for i in datalist:
            raw_message = dict(zip(header, datalist[0]))
            message = json.dumps(raw_message)
            self.producer.send_messages("TaxiData", message)
        
        self.chunk_counter += 1 
             

def automate_kafkaProducer(batch_input): 
    """
    Calling on the respective method inside the Streamer()-Class in order to send the
    data through the Kafka Producer. This is followed by a latency of 5 minutes until
    the next call of the method is executed to simulate a real-time data stream.
    """
    batch_start_time = datetime.now()
    stream.kafkaStream()
    batch_end_time = datetime.now()
    batch_duration = ((batch_end_time - batch_start_time).total_seconds())
    batch.enter(300-batch_duration, 1, automate_kafkaProducer, (batch_input,))
            
if __name__ == "__main__":
    stream = Streamer()
    batch = sched.scheduler(time.time, time.sleep)    
    batch.enter(0, 1, automate_kafkaProducer, (batch,))
    batch.run()

# Receiving Data with the Kafka Consumer

The messages sent by the Kafka Producer are received inside Spark using PySpark setting up a Kafka Consumer. The data rows subsequently read into the Spark data frame and saved onto HDFS. The datagrame is reduced by unnecessary data columns and further changes are made to the data labeling. Finally the dataframe is parsed to a Pandas dataframe for further analysis in the data model.

In [1]:
from kafka import KafkaConsumer
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import hour, expr
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SQLContext
import json

#========================================================================================================#

sc = SparkContext()
sqlContext = SQLContext(sc)
ssc = StreamingContext(sc, 10)

#1)
#If you have spark 2.2+ structured streaming is supported, so the kafka stream can directly be consumed as a Spark Data Frame
# Construct a streaming DataFrame that reads from topic1
df = spark.readStream \
  .format("kafka").option("kafka.bootstrap.servers", "quickstart.cloudera:9092").option("subscribe", "TaxiData")\
  .option("startingOffsets", "earliest").load()

#2)
#This would have been the approach if we implemented a Spark Streaming consumer - also uncomment the Streaming Context!
kafkastream = KafkaUtils.createDirectStream(ssc, topics = ['TaxiData'], 
                                        kafkaParams = {"metadata.broker.list": 'quickstart.cloudera:9092'})

    
#3)
#This is the KafkaConsumer using the kafka library - uncomment all following lines to see the messages appear in the jupyter window 
consumer = KafkaConsumer('TaxiData', fetch_max_bytes = 2048, auto_offset_reset = 'earliest', enable_auto_commit=False, bootstrap_servers=['quickstart.cloudera:9092'])

#for m in consumer:
  #  print(m)

        
#We could not store the received data into dataframes 
#Performance issues made it hard to test codes on incoming data

df = sqlContext.read.json("m")
#Stream to HDFS
df.write.format("csv").save("/TaxiData/"+counter+".csv")
 
#============================================================================================================#
#df = sqlContext.read.json("TEST2.csv") - use this command if you work with json files


#Start pyspark with the following command: $ pyspark --packages com.databricks:spark-csv_2.11:1.4.0
df = sqlContext.read.load('_0.csv', format='com.databricks.spark.csv', header='true', inferSchema='true')
df.dtypes - this command checks the types of the inferred schema - check if the datatypes are correct
#make sure tpep_pickup_datetime is a timestamp!


df = (df.withColumnRenamed('PULocationID', 'pu_id').withColumnRenamed('DOLocationID', 'do_id'))
#df.withColumn('tpep_pickup_datetime', df.pickup_datetime.cast('timestamp'))
#use the last line if you have to cast the datatype

#this is the data relevant for modelling; The hour from datetime is extracted, as we assume this one relevant 
#for possible delay in traffic
df_selection = df.select('pu_id', 'do_id', 'trip_distance', 'total_amount', hour('tpep_pickup_datetime').alias('hour'))            

#The reduced data is transformed into a pandas dataframe for further modelling
df_pandas = df_selection.toPandas()

# Data Model

The data model is implemented using the Pandas data frame as well as the Scikit-learn library. Firstly, a dummy variable is created based on the suspected rush hour times, in order to evaluate how strong it will affect future fare pricing. Dependent and independent variables are assigned and test as well as train data sets are defined.Finally linear regressions using Scikit are executed

In [None]:
import datetime
import pandas as pd, numpy as np, matplotlib.pyplot as plt, sklearn
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score, mean_squared_error


#extract the hour values and transform them into a dummy for rushhour and night-shift
hour_values = df_pandas["hour"].values

rush_hour_dummy = []
for i in hour_values: 
    if i >= 16 and i < 20 or i >= 8 and i < 10 or i >= 0 and i < 6:
        rush_hour_dummy.append(1)
    else:
        rush_hour_dummy.append(0)

series_dummy = pd.Series(rush_hour_dummy, name='hourdummy')

#the set of independent variables is saved in df_X and the price we want to estimate in df_Y
df_pandas2 = pd.concat([df_pandas, series_dummy], axis=1)

df_X = df_pandas2[["trip_distance","hourdummy"]]
df_Y = df_pandas[["total_amount"]]


#Split the data into training/testing sets
#source: http://scikit-learn.org/stable/auto_examples/linear_model/plot_ols.html
X_train = df_X[:-100]
X_test = df_X[-100:]

# Split the targets into training/testing sets
Y_train = df_Y[:-100]
Y_test = df_Y[-100:]


lr = LinearRegression()

lr.fit(X_train,Y_train )
print('Coefficients: \n', lr.coef_)

#predicts the Y-values using the testing set
Y_pred = lr.predict(X_test)

# The mean squared error - compares the predicted y-values with the real values of the test-data
print("Mean squared error: %.2f"
      % mean_squared_error(Y_test, Y_pred))
# Explained variance score (coefficient of determination): 1 is perfect prediction
print('Variance score: %.2f' % r2_score(Y_test, Y_pred))

# User Interface

We implemented a simple user interface using tkinter...

In [None]:
#Put interface in here