In [133]:
# SETUP CODE
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [134]:
# SETUP CODE
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [135]:
# SETUP CODE
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark
sc = spark.sparkContext

In [136]:
# MOUNT Google Drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [137]:
#TEST code
from pyspark import SparkContext, SQLContext, Row
from pyspark.sql.types import *
from pyspark.sql.functions import lit
sqlContext = SQLContext(sc)
#put data in a folder call csvdata
input = "/content/drive/MyDrive/csvdata/detail_record_2017_01_02_08_00_00"
text_file = sc.textFile(input)


In [141]:
#TEST code
counts = text_file.map(lambda line: line.split(",")).filter(lambda line: len(line)>8)
column_data = counts.map(lambda p: Row(p[0], p[1], p[2], p[3], p[4], \
                      p[5], p[6], p[7], p[8], p[9] , \
                      p[10], p[11], p[12], p[13], p[14], \
                      p[15], p[16], p[17], p[18]))

column_name = "driverID,carPlateNumber,Latitude,Longitude,Speed,Direction,siteName,Time,isRapidlySpeedup,isRapidlySlowdown,isNeutralSlide,isNeutralSlideFinished,neutralSlideTime,isOverspeed,isOverspeedFinished,overspeedTime,isFatigueDriving,isHthrottleStop,isOilLeak"
sql = "SELECT first(recordID),first(driverID),first(carPlateNumber),first(Time) \
                              as recordDAY,HOUR(Time) as recordHOUR,\
                              sum(isRapidlySpeedup),sum(isRapidlySlowdown),sum(isNeutralSlide),sum(isNeutralSlideFinished),\
                              sum(neutralSlideTime),sum(isOverspeed),sum(isOverspeedFinished),sum(overspeedTime),sum(isFatigueDriving),\
                              sum(isHthrottleStop),sum(isOilLeak) \
                              FROM summary \
                              GROUP BY driverID,DAY(Time),HOUR(Time)"
fields = [StructField(field_name, StringType(), True) for field_name in column_name.split(",")]
schema = StructType(fields)


In [142]:
#TEST code
dataframe = sqlContext.createDataFrame(column_data,schema)
dataframe

driverID,carPlateNumber,Latitude,Longitude,Speed,Direction,siteName,Time,isRapidlySpeedup,isRapidlySlowdown,isNeutralSlide,isNeutralSlideFinished,neutralSlideTime,isOverspeed,isOverspeedFinished,overspeedTime,isFatigueDriving,isHthrottleStop,isOilLeak
likun1000003,华AVM936,32.056444,118.777589,72,211,,2017-01-01 08:00:05,,,,,,,,,1.0,,
haowei1000008,华A709GB,30.6786,104.070835,143,115,,2017-01-01 08:00:08,,,,,,1.0,,,,,
haowei1000008,华A709GB,30.6786,104.070835,120,115,,2017-01-01 08:00:08,,,,,,,1.0,0.0,,,
zouan1000007,华A58M83,28.210856,112.979521,139,177,,2017-01-01 08:00:12,,,,,,1.0,,,,,
duxu1000009,华AT75H8,38.500677,106.210269,137,118,,2017-01-01 08:00:13,,,,,,1.0,,,,,
duxu1000009,华AT75H8,38.500517,106.210581,120,118,,2017-01-01 08:00:14,,,,,,,1.0,1.0,,,
zouan1000007,华A58M83,28.207153,112.97976,131,177,,2017-01-01 08:00:24,,1.0,,,,,,,,,
haowei1000008,华A709GB,30.675895,104.076702,130,115,,2017-01-01 08:00:29,,,,,,1.0,,,,,
haowei1000008,华A709GB,30.675895,104.076702,120,115,,2017-01-01 08:00:29,,,,,,,1.0,0.0,,,
duxu1000009,华AT75H8,38.498514,106.214488,105,118,,2017-01-01 08:00:29,,,,,,,,,,1.0,


In [143]:
#TEST code
dataframe = dataframe.withColumn("recordID",lit(input))
dataframe

driverID,carPlateNumber,Latitude,Longitude,Speed,Direction,siteName,Time,isRapidlySpeedup,isRapidlySlowdown,isNeutralSlide,isNeutralSlideFinished,neutralSlideTime,isOverspeed,isOverspeedFinished,overspeedTime,isFatigueDriving,isHthrottleStop,isOilLeak,recordID
likun1000003,华AVM936,32.056444,118.777589,72,211,,2017-01-01 08:00:05,,,,,,,,,1.0,,,/content/drive/My...
haowei1000008,华A709GB,30.6786,104.070835,143,115,,2017-01-01 08:00:08,,,,,,1.0,,,,,,/content/drive/My...
haowei1000008,华A709GB,30.6786,104.070835,120,115,,2017-01-01 08:00:08,,,,,,,1.0,0.0,,,,/content/drive/My...
zouan1000007,华A58M83,28.210856,112.979521,139,177,,2017-01-01 08:00:12,,,,,,1.0,,,,,,/content/drive/My...
duxu1000009,华AT75H8,38.500677,106.210269,137,118,,2017-01-01 08:00:13,,,,,,1.0,,,,,,/content/drive/My...
duxu1000009,华AT75H8,38.500517,106.210581,120,118,,2017-01-01 08:00:14,,,,,,,1.0,1.0,,,,/content/drive/My...
zouan1000007,华A58M83,28.207153,112.97976,131,177,,2017-01-01 08:00:24,,1.0,,,,,,,,,,/content/drive/My...
haowei1000008,华A709GB,30.675895,104.076702,130,115,,2017-01-01 08:00:29,,,,,,1.0,,,,,,/content/drive/My...
haowei1000008,华A709GB,30.675895,104.076702,120,115,,2017-01-01 08:00:29,,,,,,,1.0,0.0,,,,/content/drive/My...
duxu1000009,华AT75H8,38.498514,106.214488,105,118,,2017-01-01 08:00:29,,,,,,,,,,1.0,,/content/drive/My...


In [144]:
#TEST code
dataframe.registerTempTable("summary")
dataframe

driverID,carPlateNumber,Latitude,Longitude,Speed,Direction,siteName,Time,isRapidlySpeedup,isRapidlySlowdown,isNeutralSlide,isNeutralSlideFinished,neutralSlideTime,isOverspeed,isOverspeedFinished,overspeedTime,isFatigueDriving,isHthrottleStop,isOilLeak,recordID
likun1000003,华AVM936,32.056444,118.777589,72,211,,2017-01-01 08:00:05,,,,,,,,,1.0,,,/content/drive/My...
haowei1000008,华A709GB,30.6786,104.070835,143,115,,2017-01-01 08:00:08,,,,,,1.0,,,,,,/content/drive/My...
haowei1000008,华A709GB,30.6786,104.070835,120,115,,2017-01-01 08:00:08,,,,,,,1.0,0.0,,,,/content/drive/My...
zouan1000007,华A58M83,28.210856,112.979521,139,177,,2017-01-01 08:00:12,,,,,,1.0,,,,,,/content/drive/My...
duxu1000009,华AT75H8,38.500677,106.210269,137,118,,2017-01-01 08:00:13,,,,,,1.0,,,,,,/content/drive/My...
duxu1000009,华AT75H8,38.500517,106.210581,120,118,,2017-01-01 08:00:14,,,,,,,1.0,1.0,,,,/content/drive/My...
zouan1000007,华A58M83,28.207153,112.97976,131,177,,2017-01-01 08:00:24,,1.0,,,,,,,,,,/content/drive/My...
haowei1000008,华A709GB,30.675895,104.076702,130,115,,2017-01-01 08:00:29,,,,,,1.0,,,,,,/content/drive/My...
haowei1000008,华A709GB,30.675895,104.076702,120,115,,2017-01-01 08:00:29,,,,,,,1.0,0.0,,,,/content/drive/My...
duxu1000009,华AT75H8,38.498514,106.214488,105,118,,2017-01-01 08:00:29,,,,,,,,,,1.0,,/content/drive/My...


In [145]:
#TEST code
group_data = sqlContext.sql(sql)
group_data

first(recordID),first(driverID),first(carPlateNumber),recordDAY,recordHOUR,sum(CAST(isRapidlySpeedup AS DOUBLE)),sum(CAST(isRapidlySlowdown AS DOUBLE)),sum(CAST(isNeutralSlide AS DOUBLE)),sum(CAST(isNeutralSlideFinished AS DOUBLE)),sum(CAST(neutralSlideTime AS DOUBLE)),sum(CAST(isOverspeed AS DOUBLE)),sum(CAST(isOverspeedFinished AS DOUBLE)),sum(CAST(overspeedTime AS DOUBLE)),sum(CAST(isFatigueDriving AS DOUBLE)),sum(CAST(isHthrottleStop AS DOUBLE)),sum(CAST(isOilLeak AS DOUBLE))
/content/drive/My...,duxu1000009,华AT75H8,2017-01-01 17:01:06,17,3.0,2.0,3.0,3.0,43.0,42.0,42.0,307.0,46.0,4.0,2.0
/content/drive/My...,zouan1000007,华A58M83,2017-01-01 11:00:46,11,4.0,4.0,2.0,2.0,28.0,40.0,40.0,365.0,42.0,5.0,2.0
/content/drive/My...,xiexiao1000001,华AEB132,2017-01-01 16:00:20,16,1.0,3.0,1.0,1.0,12.0,33.0,33.0,425.0,37.0,4.0,2.0
/content/drive/My...,shenxian1000004,华ADJ750,2017-01-01 10:00:24,10,4.0,4.0,4.0,4.0,37.0,39.0,38.0,301.0,40.0,2.0,4.0
/content/drive/My...,duxu1000009,华AT75H8,2017-01-01 09:00:04,9,1.0,4.0,,,,33.0,34.0,304.0,39.0,5.0,2.0
/content/drive/My...,hanhui1000002,华AZI419,2017-01-01 08:00:32,8,9.0,5.0,5.0,5.0,19.0,24.0,24.0,342.0,48.0,3.0,2.0
/content/drive/My...,zengpeng1000000,华AZQ110,2017-01-01 14:00:06,14,5.0,5.0,2.0,2.0,5.0,11.0,11.0,144.0,25.0,2.0,1.0
/content/drive/My...,panxian1000005,华AX542C,2017-01-01 10:00:01,10,4.0,3.0,3.0,3.0,63.0,32.0,33.0,382.0,56.0,3.0,5.0
/content/drive/My...,likun1000003,华AVM936,2017-01-01 15:01:42,15,4.0,4.0,5.0,5.0,65.0,33.0,33.0,323.0,39.0,3.0,4.0
/content/drive/My...,likun1000003,华AVM936,2017-01-01 17:00:06,17,1.0,4.0,3.0,3.0,50.0,38.0,38.0,364.0,33.0,4.0,3.0


In [None]:
# MAIN CODE, for loop to get all data path
# example inp and out
# s3://comp4442-group-project/data/
# s3://comp4442-group-project/output

import os
import sys


from pyspark import SparkContext, SQLContext, Row
from pyspark.sql.types import *
from pyspark.sql.functions import lit

# args = sys.argv
# inp = args[1]
# out = args[2]

sqlContext = SQLContext(sc)

inputList = []
for i in range(2,12):
  index = 0
  if i < 10:
    index = "0" + str(i)
  else:
    index = i

  inputList.append(f"/content/drive/MyDrive/csvdata/detail_record_2017_01_{index}_08_00_00")

# text_file = sc.textFile(inp)
print(inputList)



            

['/content/drive/MyDrive/csvdata/detail_record_2017_01_02_08_00_00', '/content/drive/MyDrive/csvdata/detail_record_2017_01_03_08_00_00', '/content/drive/MyDrive/csvdata/detail_record_2017_01_04_08_00_00', '/content/drive/MyDrive/csvdata/detail_record_2017_01_05_08_00_00', '/content/drive/MyDrive/csvdata/detail_record_2017_01_06_08_00_00', '/content/drive/MyDrive/csvdata/detail_record_2017_01_07_08_00_00', '/content/drive/MyDrive/csvdata/detail_record_2017_01_08_08_00_00', '/content/drive/MyDrive/csvdata/detail_record_2017_01_09_08_00_00', '/content/drive/MyDrive/csvdata/detail_record_2017_01_10_08_00_00', '/content/drive/MyDrive/csvdata/detail_record_2017_01_11_08_00_00']


In [None]:
# MAIN CODE, for loop to do spark to all data and save to csv
for i ,input in enumerate(inputList):
  text_file = sc.textFile(input)
  # split each line into a list of fields
  # only count the lines that have at least 9 fields, ignore the case that is no special behavior
  counts = text_file.map(lambda line: line.split(",")).filter(lambda line: len(line)>8)


  column_data = counts.map(lambda p: Row(p[0], p[1], p[2], p[3], p[4], \
                                      p[5], p[6], p[7], p[8], p[9] , \
                                      p[10], p[11], p[12], p[13], p[14], \
                                      p[15], p[16], p[17], p[18]))


  column_name = "driverID,carPlateNumber,Latitude,Longitude,Speed,Direction,siteName,Time,isRapidlySpeedup,isRapidlySlowdown,isNeutralSlide,isNeutralSlideFinished,neutralSlideTime,isOverspeed,isOverspeedFinished,overspeedTime,isFatigueDriving,isHthrottleStop,isOilLeak"
  sql = "SELECT first(recordID),first(driverID),first(carPlateNumber),first(Time) \
                              as recordDAY,HOUR(Time) as recordHOUR,\
                              sum(isRapidlySpeedup),sum(isRapidlySlowdown),sum(isNeutralSlide),sum(isNeutralSlideFinished),\
                              sum(neutralSlideTime),sum(isOverspeed),sum(isOverspeedFinished),sum(overspeedTime),sum(isFatigueDriving),\
                              sum(isHthrottleStop),sum(isOilLeak) \
                              FROM summary \
                              GROUP BY driverID,DAY(Time),HOUR(Time)"

  # create schema with the column names
  fields = [StructField(field_name, StringType(), True) for field_name in column_name.split(",")]
  schema = StructType(fields)

  # apply the schema to the RDD
  dataframe = sqlContext.createDataFrame(column_data,schema)
  # add the recordID to store where these record come from
  dataframe = dataframe.withColumn("recordID",lit(input))
  # register the DataFrame as a table.
  dataframe.registerTempTable("summary")

  # execute the SQL query and save the result to the output directory
  group_data = sqlContext.sql(sql)
  group_data.coalesce(1).write.csv("/content/drive/MyDrive/csvdata/data"+str(i))

sc.stop()