In [23]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField,TimestampType, IntegerType, DateType, DoubleType, FloatType, StringType, BooleanType
spark = SparkSession.builder.appName("ETL").getOrCreate()
schema = StructType([
    StructField("Store", IntegerType(), True),
    StructField("Date", StringType(), True),
    StructField("Temperature", DoubleType(), True),
    StructField("Fuel_Price", DoubleType(), True),
    StructField("MarkDown1", FloatType(), True),
    StructField("MarkDown2", FloatType(), True),
    StructField("MarkDown3", FloatType(), True),
    StructField("MarkDown4", FloatType(), True),
    StructField("MarkDown5", FloatType(), True),
    StructField("CPI", FloatType(), True),
    StructField("Unemployment", StringType(), False),
    StructField("IsHoliday", BooleanType(), True),

])

df=spark.read.schema(schema).csv("C:/Users/sireesha/Downloads/Features data set.csv",header=True)
df.show(10)
##df.count()

    

+-----+----------+-----------+----------+---------+---------+---------+---------+---------+---------+------------+---------+
|Store|      Date|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|      CPI|Unemployment|IsHoliday|
+-----+----------+-----------+----------+---------+---------+---------+---------+---------+---------+------------+---------+
|    1|05/02/2010|      42.31|     2.572|     NULL|     NULL|     NULL|     NULL|     NULL|211.09636|       8.106|    false|
|    1|12/02/2010|      38.51|     2.548|     NULL|     NULL|     NULL|     NULL|     NULL|211.24217|       8.106|     true|
|    1|19/02/2010|      39.93|     2.514|     NULL|     NULL|     NULL|     NULL|     NULL|211.28914|       8.106|    false|
|    1|26/02/2010|      46.63|     2.561|     NULL|     NULL|     NULL|     NULL|     NULL|211.31964|       8.106|    false|
|    1|05/03/2010|       46.5|     2.625|     NULL|     NULL|     NULL|     NULL|     NULL|211.35014|       8.106|    false|


In [24]:
df.columns


['Store',
 'Date',
 'Temperature',
 'Fuel_Price',
 'MarkDown1',
 'MarkDown2',
 'MarkDown3',
 'MarkDown4',
 'MarkDown5',
 'CPI',
 'Unemployment',
 'IsHoliday']

In [25]:
df.show(5)

+-----+----------+-----------+----------+---------+---------+---------+---------+---------+---------+------------+---------+
|Store|      Date|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|      CPI|Unemployment|IsHoliday|
+-----+----------+-----------+----------+---------+---------+---------+---------+---------+---------+------------+---------+
|    1|05/02/2010|      42.31|     2.572|     NULL|     NULL|     NULL|     NULL|     NULL|211.09636|       8.106|    false|
|    1|12/02/2010|      38.51|     2.548|     NULL|     NULL|     NULL|     NULL|     NULL|211.24217|       8.106|     true|
|    1|19/02/2010|      39.93|     2.514|     NULL|     NULL|     NULL|     NULL|     NULL|211.28914|       8.106|    false|
|    1|26/02/2010|      46.63|     2.561|     NULL|     NULL|     NULL|     NULL|     NULL|211.31964|       8.106|    false|
|    1|05/03/2010|       46.5|     2.625|     NULL|     NULL|     NULL|     NULL|     NULL|211.35014|       8.106|    false|


In [26]:
df.printSchema()

root
 |-- Store: integer (nullable = true)
 |-- Date: string (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Fuel_Price: double (nullable = true)
 |-- MarkDown1: float (nullable = true)
 |-- MarkDown2: float (nullable = true)
 |-- MarkDown3: float (nullable = true)
 |-- MarkDown4: float (nullable = true)
 |-- MarkDown5: float (nullable = true)
 |-- CPI: float (nullable = true)
 |-- Unemployment: string (nullable = true)
 |-- IsHoliday: boolean (nullable = true)



In [27]:
df.count()

8190

In [28]:
from pyspark.sql.functions import col,sum,to_date,when,lit
df= df.replace('NA', None)
df.select([col(c).isNull().alias(c) for c in df.columns]).agg(*[sum(col(c).cast("int")).alias(c) for c in df.columns]).show()



+-----+----+-----------+----------+---------+---------+---------+---------+---------+---+------------+---------+
|Store|Date|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|CPI|Unemployment|IsHoliday|
+-----+----+-----------+----------+---------+---------+---------+---------+---------+---+------------+---------+
|    0|   0|          0|         0|     4158|     5269|     4577|     4726|     4140|585|         585|        0|
+-----+----+-----------+----------+---------+---------+---------+---------+---------+---+------------+---------+



In [29]:
df = df.dropDuplicates()
df.count()

8190

In [30]:
df = df.fillna({'MarkDown1': 0, 'MarkDown2': 0, 'MarkDown3': 0, 'MarkDown4': 0, 'MarkDown5': 0})
mean_cpi = df.agg({'CPI': 'mean'}).collect()[0][0]
mean_unemployment = df.agg({'Unemployment': 'mean'}).collect()[0][0]
df = df.fillna({'CPI': mean_cpi, 'Unemployment': mean_unemployment})

df.select([col(c).isNull().alias(c) for c in df.columns]).agg(*[sum(col(c).cast("int")).alias(c) for c in df.columns]).show()



+-----+----+-----------+----------+---------+---------+---------+---------+---------+---+------------+---------+
|Store|Date|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|CPI|Unemployment|IsHoliday|
+-----+----+-----------+----------+---------+---------+---------+---------+---------+---+------------+---------+
|    0|   0|          0|         0|        0|        0|        0|        0|        0|  0|           0|        0|
+-----+----+-----------+----------+---------+---------+---------+---------+---------+---+------------+---------+



In [32]:
df.show(10)

+-----+----------+-----------+----------+---------+---------+---------+---------+---------+---------+------------+---------+
|Store|      Date|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|      CPI|Unemployment|IsHoliday|
+-----+----------+-----------+----------+---------+---------+---------+---------+---------+---------+------------+---------+
|    1|11/01/2013|      50.32|     3.243|  6520.24|  16134.6|    12.17|   774.55|  4001.25|224.20155|       6.525|    false|
|    4|17/08/2012|      81.41|     3.552|  2531.63|    15.04|     5.54|  1269.13|   3528.9|130.79097|       4.077|    false|
|    5|27/07/2012|      85.06|     3.407|  2696.67|     7.75|     0.36|  3104.68|  2152.23|222.52972|       5.603|    false|
|    5|03/08/2012|      86.91|     3.417|  7365.26|    14.48|      0.7|   3640.7|  2517.31|222.53851|       5.603|    false|
|    6|07/09/2012|      86.33|      3.73| 14105.12|    47.96|     24.6|  3040.62|  3558.18|224.05602|       5.668|     true|


In [12]:
df = df.withColumn("Unemployment", col("Unemployment").cast("float")) 
       
df.printSchema()


root
 |-- Store: integer (nullable = true)
 |-- Date: string (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Fuel_Price: double (nullable = true)
 |-- MarkDown1: float (nullable = false)
 |-- MarkDown2: float (nullable = false)
 |-- MarkDown3: float (nullable = false)
 |-- MarkDown4: float (nullable = false)
 |-- MarkDown5: float (nullable = false)
 |-- CPI: float (nullable = false)
 |-- Unemployment: float (nullable = true)
 |-- IsHoliday: boolean (nullable = true)



In [13]:
df.filter(col("Date").isNull()).select("Date").show()


+----+
|Date|
+----+
+----+



In [18]:
df.write.option("header", "true").mode("overwrite").csv("C:/Users/sireesha/Downloads/Features_data_cleaned")


In [22]:

import os
from google.cloud import bigquery
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="C:/Users/sireesha/Downloads/glassy-land-429507-s9-58daca9d3510.json"
client = bigquery.Client()
project_id = "glassy-land-429507-s9"
dataset_id = "onlineretail"
table_id = "table_new"
csv_file_path = "C:/Users/sireesha/Downloads/Features_data_cleaned/part-00000-d239da2d-4677-4bd9-948b-a09bb05bcccc-c000.csv"

table_ref = f"{project_id}.{dataset_id}.{table_id}"

job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.CSV,
    skip_leading_rows=1,      
    autodetect=True,           
    write_disposition="WRITE_TRUNCATE")

with open(csv_file_path, "rb") as source_file:
    load_job = client.load_table_from_file(
        source_file,
        destination=table_ref,
        job_config=job_config,
    )

load_job.result()

destination_table = client.get_table(table_ref)
print(f"Loaded {destination_table.num_rows} rows into {table_ref}.") 




Loaded 8190 rows into glassy-land-429507-s9.onlineretail.table_new.
