In [22]:
from pyspark.sql.functions import col, trim, to_date, regexp_replace, round
from pyspark.sql.types import DoubleType

In [23]:
from pyspark.sql import SparkSession
import os
os.environ["PYSPARK_PYTHON"] = (r"C:\Users\Dharmendra\AppData\Local\Programs\Python\Python310\pythonw.exe")

In [24]:
spark = SparkSession.builder \
    .master("local") \
    .appName("SalesDB") \
    .config("spark.driver.memory", "2g") \
    .config("spark.driver.extraClassPath", "D:\\Data Engineering\\Sales_Aug12\\jdbc\\postgresql-42.6.0.jar") \
    .getOrCreate()

Initial transformation steps:-

1. import data into df.

In [25]:
raw_data = spark.read.csv("data.csv",header=True,inferSchema=True)

In [26]:
# Get the schema of the DataFrame
df_schema = raw_data.schema

# Iterate through the fields in the schema and print their names and data types
for field in df_schema.fields:
    print(f"Column '{field.name}' has data type: {field.dataType}")



Column 'country' has data type: StringType()
Column 'order_value_EUR ' has data type: StringType()
Column ' cost ' has data type: DoubleType()
Column 'date' has data type: StringType()
Column 'category' has data type: StringType()
Column 'customer_name' has data type: StringType()
Column 'sales_manager' has data type: StringType()
Column 'sales_rep' has data type: StringType()
Column 'device_type' has data type: StringType()
Column 'order_id' has data type: StringType()


2. change datatype accrodingly:-
	country	 str,
	category str,
	customer_name	 str,
	sales_manager str,	
	sales_rep	str,
	device_type	str,
	order_id str,
	order_value_EUR 	 double,
	cost 	double,
	date	date	

In [27]:
columns = ("country","order_value_EUR","","cost","date","category","customer_name","sales_manager","sales_rep","device_type","order_id")

In [28]:
#Removing extra spaces from the column names.
new_column_names = [col_name.strip() for col_name in raw_data.columns]
raw_data = raw_data.toDF(*new_column_names)

In [29]:
raw_data = raw_data.withColumn("order_value",col("order_value_EUR"))
                    

In [30]:
# Remove commas from the order_value column
raw_data = raw_data.withColumn("order_value", regexp_replace(col("order_value"), "[^0-9.]", ""))

In [31]:
raw_data = raw_data.withColumn("order_value",col("order_value").cast("double"))\
                    .withColumn("date", to_date(col("date"), "M/d/yyyy"))\
                    .drop("order_value_EUR")\
                    .withColumn("profit", round(col("order_value") - col("cost"),2))\
                    .withColumn("profit_percentage", round((col("profit") / col("order_value")) * 100, 2))
                    

In [36]:
#Load raw_data df to raw_tbl


raw_data.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5433/sales") \
    .option("dbtable", "raw_tbl") \
    .option("user", "postgres") \
    .option("password", "123") \
    .option("driver", "org.postgresql.Driver") \
    .save()

In [32]:
#4. Desribute the data into three more tables other than Sales_tbl .





In [33]:
# creating revenue dataframe from raw_data and load into table
revenue_df = raw_data.select("country","order_value","date","order_id")
revenue_df.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5433/sales") \
    .option("dbtable", "revenue_tbl") \
    .option("user", "postgres") \
    .option("password", "123") \
    .option("driver", "org.postgresql.Driver") \
    .save()

In [34]:
#Creating profit dataframe from raw_data and load into table
profit_df = raw_data.select("country","date","category","profit","profit_percentage")
profit_df.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5433/sales") \
    .option("dbtable", "profit_tbl") \
    .option("user", "postgres") \
    .option("password", "123") \
    .option("driver", "org.postgresql.Driver") \
    .save()

In [35]:
#Creating corelation dataframe from raw_data and load into table

corelation_df = raw_data.select("sales_manager","sales_rep","order_value","date")
corelation_df.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5433/sales") \
    .option("dbtable", "correlation_tbl") \
    .option("user", "postgres") \
    .option("password", "123") \
    .option("driver", "org.postgresql.Driver") \
    .save()