# ETL (to test on local with a sample of log files)

In [1]:
# Import libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, ArrayType,StructType,StructField
from pyspark.sql import functions as F

In [2]:
def createSparkSession():
    """
    Desccription : initiate sparkSession (configure and create or get application if it exists)
    To deal with dataframe
    """
    spark = SparkSession.builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .getOrCreate()
        
    sc = spark.sparkContext
    sc._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.algorithm.version", "2")

    return spark

In [3]:
# Load json files
spark = createSparkSession()

In [4]:
path = "~/recognizer_logs/*.json"
df = spark.read.json(path, multiLine=True)

In [6]:
df.printSchema()

root
 |-- duration: long (nullable = true)
 |-- frame_id: long (nullable = true)
 |-- plates: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- car: struct (nullable = true)
 |    |    |    |-- bodyStyle: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- confidence: double (nullable = true)
 |    |    |    |    |    |-- klass: long (nullable = true)
 |    |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- color: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- confidence: double (nullable = true)
 |    |    |    |    |    |-- klass: long (nullable = true)
 |    |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- confidence: double (nullable = true)
 |    |    |    |-- makeModelYear: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |

### Explode structure and select frame id, plates and timestamp

In [None]:
# explode in level 1
explodeDFL1 = df.select(F.col("frame_id"),\
                        F.explode("plates").alias("plate"),\
                       F.col("timestamp").alias("timestamp"))
#print schema
explodeDFL1.printSchema()

## I - Car caracterisques

### Create bodyStyle table

In [None]:
bodyStyleFields = ["frame_id", "bodyStyle.name", "bodyStyle.confidence", "timestamp"]
bodyStyleFieldsName = ["frameID", "bodyStyleName", "bodyStyleConfidence", "createDate"]

bodyStyleDF = explodeDFL1.select(F.col("frame_id"),\
                                F.col("plate.car.bodyStyle")[0].alias("bodyStyle"),\
                                F.col("timestamp"))

bodyStyleTable = bodyStyleDF.select(*bodyStyleFields).toDF(*bodyStyleFieldsName)
bodyStyleTable.show()

### Create color table

In [None]:
colorFields = ["frame_id", "color.name", "color.confidence", "timestamp"]
colorFieldsName = ["frameID", "colorName", "colorConfidence", "createDate"]
colorDF = explodeDFL1.select(F.col("frame_id"),\
                             F.col("plate.car.color")[0].alias("color"),\
                             F.col("timestamp"))

# Create color table:
colorTable = colorDF.select(*colorFields).toDF(*colorFieldsName)
colorTable.show()

### Create makeModelYear table

In [None]:
makeModelYearFields = ["frame_id", "makeModelYear.model", "makeModelYear.confidence", "makeModelYear.make",
                       "makeModelYear.year", "timestamp"]
cmakeModelYearFieldsName = ["frameID", "model", "makeModelConfidence", "make","year", "createDate"]

makeModelYearDF = explodeDFL1.select(F.col("frame_id"),\
                             F.col("plate.car.makeModelYear")[0].alias("makeModelYear"),\
                             F.col("timestamp"))

# Create color table:
makeModelYearTable = makeModelYearDF.select(*makeModelYearFields).toDF(*cmakeModelYearFieldsName)
makeModelYearTable.show()

### Create WarpedBox table

In [None]:
warpedBoxCarFields = ["frame_id", F.col("plate.car.warpedBox")[0], F.col("plate.car.warpedBox")[1], 
                      F.col("plate.car.warpedBox")[4], F.col("plate.car.warpedBox")[5], "timestamp"]
warpedBoxCarFieldsName = ["frameID", "warpedBoxV1", "warpedBoxV2", "warpedBoxV3","warpedBoxV4", "createDate"]

warpedBoxCarDF = explodeDFL1.select(*warpedBoxCarFields).toDF(*warpedBoxCarFieldsName)
warpedBoxCarDF.show()

### Create car transactional table

In [None]:
carDF = explodeDFL1.select(F.col("frame_id"),\
                          F.col("plate.car.confidence").alias("globalConfidence"),\
                          F.col("timestamp").alias("createDate"))
carDF.show()

## II - plate caracterisques

### Create country table

In [None]:
countryFields = ["frame_id", "plateCountry.name", "plateCountry.state", "plateCountry.code", 
                 "plateCountry.confidence","timestamp"]

countryFieldsName = ["frameID", "country", "countryState", "countryCode","countryConfidence", "createDate"]

countryDF = explodeDFL1.select(F.col("frame_id"),\
                             F.col("plate.country")[0].alias("plateCountry"),\
                             F.col("timestamp"))

# Create color table:
countryTable = countryDF.select(*countryFields).toDF(*countryFieldsName)
countryTable.show()

### Create warpedBox plate table

In [None]:
warpedBoxPlateFields = ["frame_id", "plate.text", F.col("plate.warpedBox")[0],F.col("plate.warpedBox")[1],
                        F.col("plate.warpedBox")[4], F.col("plate.warpedBox")[5], "timestamp"]
warpedBoxPlateFieldsName = ["frameID", "pateText", "warpedBoxV0", "warpedBoxV1", "warpedBoxV4","warpedBoxV5"
                            , "createDate"]

warpedBoxPlateDF = explodeDFL1.select(*warpedBoxPlateFields).toDF(*warpedBoxPlateFieldsName)
warpedBoxPlateDF.show()

### Create plate transactional table

In [None]:
plateFields = ["frame_id", "plate.text", F.col("plate.confidences")[1], F.col("plate.confidences")[2],"timestamp"]
plateFieldsName = ["frame_id", "plateText", "plateConfidenceV1", "plateConfidenceV2","createDate"]

plateDF = explodeDFL1.select(*plateFields).toDF(*plateFieldsName)

plateDF.show()