In [None]:

#!pip install great-expectations

In [14]:
import findspark

findspark.init()
findspark.find()
import pyspark

In [15]:

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import sys
sys.path.append('src')
from utils import *

# Create a SparkSession with the given application name
spark = SparkSession.builder.appName("ReadCSV").getOrCreate()

# Read CSV file into a DataFrame, with header and inferred schema
df = spark.read.csv("online retail.csv", header=True, inferSchema=True)

# Show the content of the DataFrame
df.show()

#print(df)


+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/2010 8:26|     4.

In [16]:
# Store the schema of the DataFrame in a dictionary
schema_df = df.schema
schema_store = {"version_1":schema_df}
print(schema_store)

{'version_1': StructType([StructField('InvoiceNo', StringType(), True), StructField('StockCode', StringType(), True), StructField('Description', StringType(), True), StructField('Quantity', IntegerType(), True), StructField('InvoiceDate', StringType(), True), StructField('UnitPrice', DoubleType(), True), StructField('CustomerID', IntegerType(), True), StructField('Country', StringType(), True)])}


In [17]:
# Extract field names from the schema
field_names = [field.name for field in schema_store['version_1']]

# Extract types from StructType
#field_types = [field.dataType for field in schema_store['version_1']]

# Define new field name and data type
new_field = "City"
new_field_value = "London"
new_field_data_type = "string"
if new_field not in field_names:
    df = add_new_field(df, new_field, new_field_value, new_field_data_type)
    df.show()





+---------+---------+--------------------+--------+--------------+---------+----------+--------------+------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|  City|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|London|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|London|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|London|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|London|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|London|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26|     7.65|     17850|United Kingdom|London|
|   536365

In [18]:


# Define existing field name and updated data type
existing_field_name = "Quantity"
updated_data_type = "double"
if existing_field_name in field_names:
    df = update_field_type(df, existing_field_name, updated_data_type)
    df.show()

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|  City|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+------+
|   536365|   85123A|WHITE HANGING HEA...|     6.0|12/1/2010 8:26|     2.55|     17850|United Kingdom|London|
|   536365|    71053| WHITE METAL LANTERN|     6.0|12/1/2010 8:26|     3.39|     17850|United Kingdom|London|
|   536365|   84406B|CREAM CUPID HEART...|     8.0|12/1/2010 8:26|     2.75|     17850|United Kingdom|London|
|   536365|   84029G|KNITTED UNION FLA...|     6.0|12/1/2010 8:26|     3.39|     17850|United Kingdom|London|
|   536365|   84029E|RED WOOLLY HOTTIE...|     6.0|12/1/2010 8:26|     3.39|     17850|United Kingdom|London|
|   536365|    22752|SET 7 BABUSHKA NE...|     2.0|12/1/2010 8:26|     7.65|     17850|United Kingdom|London|
|   536365

In [19]:
# Store updated schema in schema_store
schema_store["version_2"] = df.schema

In [20]:
schema_store

{'version_1': StructType([StructField('InvoiceNo', StringType(), True), StructField('StockCode', StringType(), True), StructField('Description', StringType(), True), StructField('Quantity', IntegerType(), True), StructField('InvoiceDate', StringType(), True), StructField('UnitPrice', DoubleType(), True), StructField('CustomerID', IntegerType(), True), StructField('Country', StringType(), True)]),
 'version_2': StructType([StructField('InvoiceNo', StringType(), True), StructField('StockCode', StringType(), True), StructField('Description', StringType(), True), StructField('Quantity', DoubleType(), True), StructField('InvoiceDate', StringType(), True), StructField('UnitPrice', DoubleType(), True), StructField('CustomerID', IntegerType(), True), StructField('Country', StringType(), True), StructField('City', StringType(), False)])}

In [None]:
output_file = "updated_schema.csv"
df.write.format("csv").option("header", "true").mode("overwrite").save(output_file)
#df.toPandas().to_csv(output_file, index=False)
