Pseudo code

---- PREPARE DATA BY SPLITTING INTO INCREMENTAL LOAD

1. Convert to date data type
if yy > 25 then:
    Custom function to convert 20th century years dates to 1997 instead of 2097 using strftime
else:
    to_date

2. Function to iteratively for count distinct yyyy-mm months:
- dt = every date
- append 1 day's data to df
- save as '/date/csv_name' + dt + '.csv'
- dateadd a day

---- CONVERT EACH SAVED CSV FILE TO PARQUET AND SAVE IT TO 'LANDING' FOLDER

---- BRONZE LAYER: ConformInterface
1. Read parquet file from LANDING for a specific date and a schema.json
2. Write a pyspark code to verify parquet schema vs mentioned schema in json.
3. Move parquet file to BRONZE if verification is successful else throw error and stop processing.

---- SILVER LAYER: StandardizeColumns
1. Read parquet file from BRONZE for same specific date
2. Add audit columns to spark dataframe
3. Save dataframe to SILVER with same name and delete file in BRONZE

---- GOLD LAYER: ChangeDataCapture
1. Perform incremental load of only changed data

---- MODEL SQL DATABASE SCHEMA, IT WILL ACT AS WAREHOUSE (EXPLORE OPTIONS FOR FREE TIER)    -dbdiagram.io, erdlab.io

---- COPY GOLD DATA INTO SQL WAREHOUSE

---- USE DBT FREE TIER TO INTEGRATE, ANY TRANSFORMATION, CREATE EXTRACT TABLES

---- SERVE EXTRACT TABLE TO SERVING FOLDER AS CSV FOR ANALYSIS

---- CREATE POWERBI DASHBOARD

---- CREATE ML MODEL TO PREDICT FUTURE PRICES BASED ON DOMESTIC FACTORS SUCH AS MSP, PRODUCTION





In [23]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window as W
from pyspark.sql.types import *

In [3]:
spark = SparkSession.builder.getOrCreate()

In [4]:
rice_path = 'datasets/daily_retail_price_Rice_upto-apr_2015.csv'
df = spark.read.csv(rice_path, header=True)

In [5]:
df.show(5)

+--------+-----------+--------------+-----+
|    Date|Centre_Name|Commodity_Name|Price|
+--------+-----------+--------------+-----+
|25-11-97|      DELHI|          Rice|   10|
|25-11-97|     SHIMLA|          Rice|   12|
|25-11-97|    LUCKNOW|          Rice|  6.5|
|25-11-97|  AHMEDABAD|          Rice|   10|
|25-11-97|     BHOPAL|          Rice|    9|
+--------+-----------+--------------+-----+
only showing top 5 rows



In [6]:
df1 = df.groupBy('Centre_Name').agg(countDistinct('Date')).alias('no_of_dates')
df1.toPandas().to_csv('locations.csv')

In [7]:
# Convert string type date field to date type and yyyy-mm-dd format

df = df.withColumn("date_column", to_date(df["Date"], "dd-MM-yy"))
--year = year-100 if year>2025

In [33]:
# Add an identity column

df = df.coalesce(1).withColumn("idx", monotonically_increasing_id())
w = W.orderBy("idx")
df = df.withColumn("id", F.row_number().over(w))
df = df.drop("idx")


In [34]:
df.show(5)

+--------+-----------+--------------+-----+-----------+---+
|    Date|Centre_Name|Commodity_Name|Price|date_column| id|
+--------+-----------+--------------+-----+-----------+---+
|25-11-97|      DELHI|          Rice|   10| 2097-11-25|  1|
|25-11-97|     SHIMLA|          Rice|   12| 2097-11-25|  2|
|25-11-97|    LUCKNOW|          Rice|  6.5| 2097-11-25|  3|
|25-11-97|  AHMEDABAD|          Rice|   10| 2097-11-25|  4|
|25-11-97|     BHOPAL|          Rice|    9| 2097-11-25|  5|
+--------+-----------+--------------+-----+-----------+---+
only showing top 5 rows



In [27]:
df.drop(df.id).show(5)

+--------+-----------+--------------+-----+-----------+
|    Date|Centre_Name|Commodity_Name|Price|date_column|
+--------+-----------+--------------+-----+-----------+
|25-11-97|      DELHI|          Rice|   10| 2097-11-25|
|25-11-97|     SHIMLA|          Rice|   12| 2097-11-25|
|25-11-97|    LUCKNOW|          Rice|  6.5| 2097-11-25|
|25-11-97|  AHMEDABAD|          Rice|   10| 2097-11-25|
|25-11-97|     BHOPAL|          Rice|    9| 2097-11-25|
+--------+-----------+--------------+-----+-----------+
only showing top 5 rows



In [20]:
df.show(5)

+--------+-----------+--------------+-----+-----------+---+
|    Date|Centre_Name|Commodity_Name|Price|date_column| id|
+--------+-----------+--------------+-----+-----------+---+
|25-11-97|      DELHI|          Rice|   10| 2097-11-25|  0|
|25-11-97|     SHIMLA|          Rice|   12| 2097-11-25|  1|
|25-11-97|    LUCKNOW|          Rice|  6.5| 2097-11-25|  2|
|25-11-97|  AHMEDABAD|          Rice|   10| 2097-11-25|  3|
|25-11-97|     BHOPAL|          Rice|    9| 2097-11-25|  4|
+--------+-----------+--------------+-----+-----------+---+
only showing top 5 rows

