## INCREMENTAL LOAD

In [1]:
file_date = '2021-04-18'

<div style="max-width:1400px;margin-center: auto">
<img src="images\results.png" width="600"/>
</div>

In [2]:
import findspark
findspark.init()
import pyspark
from delta import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

#  Create a spark session with Delta
builder = pyspark.sql.SparkSession.builder.appName("dbcreation") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

# Create spark context
spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [3]:
from pyspark.sql.types import IntegerType, FloatType, StringType, StructField, StructType
from pyspark.sql.functions import col, current_timestamp,count,max,lit,desc

In [4]:
results_schema = StructType(fields=[StructField("resultId", IntegerType(), False),
                                    StructField("raceId", IntegerType(), True),
                                    StructField("driverId", IntegerType(), True),
                                    StructField("constructorId", IntegerType(), True),
                                    StructField("number", IntegerType(), True),
                                    StructField("grid", IntegerType(), True),
                                    StructField("position", IntegerType(), True),
                                    StructField("positionText", StringType(), True),
                                    StructField("positionOrder", IntegerType(), True),
                                    StructField("points", FloatType(), True),
                                    StructField("laps", IntegerType(), True),
                                    StructField("time", StringType(), True),
                                    StructField("milliseconds", IntegerType(), True),
                                    StructField("fastestLap", IntegerType(), True),
                                    StructField("rank", IntegerType(), True),
                                    StructField("fastestLapTime", StringType(), True),
                                    StructField("fastestLapSpeed", FloatType(), True),
                                    StructField("statusId", StringType(), True)])

In [5]:
results_df = spark.read.json(f'raw files\\{file_date}\\results.json', schema = results_schema)
results_df.show(2)

+--------+------+--------+-------------+------+----+--------+------------+-------------+------+----+-----------+------------+----------+----+--------------+---------------+--------+
|resultId|raceId|driverId|constructorId|number|grid|position|positionText|positionOrder|points|laps|       time|milliseconds|fastestLap|rank|fastestLapTime|fastestLapSpeed|statusId|
+--------+------+--------+-------------+------+----+--------+------------+-------------+------+----+-----------+------------+----------+----+--------------+---------------+--------+
|   24986|  1053|     830|            9|    33|   3|       1|           1|            1|  25.0|  63|2:02:34.598|     7354598|        60|   2|      1:17.524|         227.96|       1|
|   24987|  1053|       1|          131|    44|   1|       2|           2|            2|  19.0|  63|    +22.000|     7376598|        60|   1|      1:16.702|        230.403|       1|
+--------+------+--------+-------------+------+----+--------+------------+-------------+--

In [6]:
cut_over_df = spark.read.json(f'raw files\\2021-03-21\\results.json', schema = results_schema)
df_file1 = spark.read.json(f'raw files\\2021-03-28\\results.json', schema = results_schema)
print('cut_over count of distinct_race_id : ',cut_over_df.select('raceId').distinct().count())
print('df_file1 count of distinct_race_id  : ',df_file1.select('raceId').distinct().count())
print('cut_over_count for each raceId : ')
cut_over_df.groupBy('raceId').agg(count('*')).show(3)
print('df_file1_count for each raceId : ')
df_file1.groupBy('raceId').agg(count('*')).show()
print('cut_over_max of raceId : ')
cut_over_df.agg(max('raceId')).show()

cut_over count of distinct_race_id :  1035
df_file1 count of distinct_race_id  :  1
cut_over_count for each raceId : 
+------+--------+
|raceId|count(1)|
+------+--------+
|   148|      22|
|   463|      29|
|   471|      32|
+------+--------+
only showing top 3 rows

df_file1_count for each raceId : 
+------+--------+
|raceId|count(1)|
+------+--------+
|  1052|      20|
+------+--------+

cut_over_max of raceId : 
+-----------+
|max(raceId)|
+-----------+
|       1047|
+-----------+



In [7]:
results_with_columns_df = results_df.withColumnRenamed("resultId", "result_id") \
                                    .withColumnRenamed("raceId", "race_id") \
                                    .withColumnRenamed("driverId", "driver_id") \
                                    .withColumnRenamed("constructorId", "constructor_id") \
                                    .withColumnRenamed("positionText", "position_text") \
                                    .withColumnRenamed("positionOrder", "position_order") \
                                    .withColumnRenamed("fastestLap", "fastest_lap") \
                                    .withColumnRenamed("fastestLapTime", "fastest_lap_time") \
                                    .withColumnRenamed("fastestLapSpeed", "fastest_lap_speed") \
                                    .withColumn("ingestion_date", current_timestamp())\
                                    .withColumn("file_date",lit(file_date))
results_with_columns_df.show(2)

+---------+-------+---------+--------------+------+----+--------+-------------+--------------+------+----+-----------+------------+-----------+----+----------------+-----------------+--------+--------------------+----------+
|result_id|race_id|driver_id|constructor_id|number|grid|position|position_text|position_order|points|laps|       time|milliseconds|fastest_lap|rank|fastest_lap_time|fastest_lap_speed|statusId|      ingestion_date| file_date|
+---------+-------+---------+--------------+------+----+--------+-------------+--------------+------+----+-----------+------------+-----------+----+----------------+-----------------+--------+--------------------+----------+
|    24986|   1053|      830|             9|    33|   3|       1|            1|             1|  25.0|  63|2:02:34.598|     7354598|         60|   2|        1:17.524|           227.96|       1|2024-02-11 15:21:...|2021-04-18|
|    24987|   1053|        1|           131|    44|   1|       2|            2|             2|  19.0

In [8]:
results_final_df = results_with_columns_df.drop(col("statusId"))
results_final_df.show(2)

+---------+-------+---------+--------------+------+----+--------+-------------+--------------+------+----+-----------+------------+-----------+----+----------------+-----------------+--------------------+----------+
|result_id|race_id|driver_id|constructor_id|number|grid|position|position_text|position_order|points|laps|       time|milliseconds|fastest_lap|rank|fastest_lap_time|fastest_lap_speed|      ingestion_date| file_date|
+---------+-------+---------+--------------+------+----+--------+-------------+--------------+------+----+-----------+------------+-----------+----+----------------+-----------------+--------------------+----------+
|    24986|   1053|      830|             9|    33|   3|       1|            1|             1|  25.0|  63|2:02:34.598|     7354598|         60|   2|        1:17.524|           227.96|2024-02-11 15:21:...|2021-04-18|
|    24987|   1053|        1|           131|    44|   1|       2|            2|             2|  19.0|  63|    +22.000|     7376598|     

In [9]:
%run "common_functions.ipynb"

In [None]:
results_deduped_df = results_final_df.dropDuplicates(['race_id', 'driver_id'])

In [10]:
merge_condition = "tgt.result_id = src.result_id AND tgt.race_id = src.race_id"
merge_delta_data(results_deduped_df, 'default', 'results', 'E:/unused/Udemy/Spark_practice/raw/Delta%20lake/spark-warehouse',merge_condition,'race_id')

In [11]:
df = spark.sql('SELECT * FROM default.results')
print('df_file1_count for each raceId : ')
df.groupBy('race_id').agg(count('*')).orderBy(desc('race_id')).show()

df_file1_count for each raceId : 
+-------+--------+
|race_id|count(1)|
+-------+--------+
|   1053|      20|
+-------+--------+

