In [3]:
# Base path to csv files
base_path = "../raw-files/Production Data/"

# List of file names
file_names = [
    'Production Culture.csv',
    'Production Document.csv',
    'Production Illustration.csv',
    'Production Product.csv',
    'Production ProductCategory.csv',
    'Production ProductDescription.csv',
    'Production ProductDocument.csv',
    'Production ProductModel.csv',
    'Production ProductModelIllustration.csv',
    'Production ProductModelProductDescriptionCulture.csv',
    'Production ProductPhoto.csv',
    'Production ProductProductPhoto.csv',
    'Production ProductReview.csv',
    'Production ProductSubcategory.csv',
    'Production UnitMeasure.csv'
]

In [4]:
from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder.appName("adventure-works").getOrCreate()

# Read each file into a DataFrame
dataframes = {}
for file_name in file_names:
    df_name = file_name.replace(' ', '_').replace('.csv', '').lower()
    dataframes[df_name] = spark.read.format('csv').option('header', 'true').load(f'{base_path}/{file_name}')

# Accessing individual DataFrames
culture_df = dataframes['production_culture']
document_df = dataframes['production_document']
illustration_df = dataframes['production_illustration']
product_df = dataframes['production_product']
product_category_df = dataframes['production_productcategory']
product_description_df = dataframes['production_productdescription']
product_document_df = dataframes['production_productdocument']
product_model_df = dataframes['production_productmodel']
product_model_illustration_df = dataframes['production_productmodelillustration']
product_model_product_description_culture_df = dataframes['production_productmodelproductdescriptionculture']
product_photo_df = dataframes['production_productphoto']
product_product_photo_df = dataframes['production_productproductphoto']
product_review_df = dataframes['production_productreview']
product_subcategory_df = dataframes['production_productsubcategory']
unit_measure_df = dataframes['production_unitmeasure']

24/06/24 13:51:53 WARN Utils: Your hostname, Joshs-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.100.2 instead (on interface en0)
24/06/24 13:51:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/24 13:51:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/06/24 13:51:58 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/06/24 13:51:58 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [5]:
def rename_columns(df, rename_mapping):
    for old_name, new_name in rename_mapping.items():
        df = df.withColumnRenamed(old_name, new_name)
    return df

# Define the rename mappings for each DataFrame
rename_mappings = {
    'culture_df': {'Name': 'CultureName', 'ModifiedDate': 'CultureModifiedDate'},
    'document_df': {'ModifiedDate': 'DocumentModifiedDate', 'rowguid': 'Documentrowguid'},
    'illustration_df': {'ModifiedDate': 'IllustrationModifiedDate'},
    'product_df': {'Name': 'ProductName', 'ModifiedDate': 'ProductModifiedDate'},
    'product_category_df': {'Name': 'ProductCategoryName', 'ModifiedDate': 'ProductCategoryModifiedDate', 'rowguid': 'ProductCategoryrowguid'},
    'product_description_df': {'Name': 'ProductDescriptionName', 'ModifiedDate': 'ProductDescriptionModifiedDate', 'rowguid': 'ProductDescriptionrowguid'},
    'product_document_df': {'ModifiedDate': 'ProductDocumentModifiedDate'},
    'product_model_df': {'Name': 'ProductModelName', 'ModifiedDate': 'ProductModelModifiedDate', 'rowguid': 'ProductModelrowguid'},
    'product_model_illustration_df': {'ModifiedDate': 'ProductModelIllustrationModifiedDate'},
    'product_model_product_description_culture_df': {'ModifiedDate': 'ProductModelProductDescriptionCultureModifiedDate'},
    'product_photo_df': {'ModifiedDate': 'ProductPhotoModifiedDate'},
    'product_product_photo_df': {'ModifiedDate': 'ProductProductPhotoModifiedDate'},
    'product_review_df': {'ModifiedDate': 'ProductReviewModifiedDate'},
    'product_subcategory_df': {'Name': 'ProductSubcategoryName', 'ModifiedDate': 'ProductSubcategoryModifiedDate', 'rowguid': 'ProductSubcategoryrowguid'},
    'unit_measure_df': {'Name': 'UnitMeasureName', 'ModifiedDate': 'UnitMeasureModifiedDate'}
}

# Apply the renaming to each DataFrame
culture_df = rename_columns(culture_df, rename_mappings['culture_df'])
document_df = rename_columns(document_df, rename_mappings['document_df'])
illustration_df = rename_columns(illustration_df, rename_mappings['illustration_df'])
product_df = rename_columns(product_df, rename_mappings['product_df'])
product_category_df = rename_columns(product_category_df, rename_mappings['product_category_df'])
product_description_df = rename_columns(product_description_df, rename_mappings['product_description_df'])
product_document_df = rename_columns(product_document_df, rename_mappings['product_document_df'])
product_model_df = rename_columns(product_model_df, rename_mappings['product_model_df'])
product_model_illustration_df = rename_columns(product_model_illustration_df, rename_mappings['product_model_illustration_df'])
product_model_product_description_culture_df = rename_columns(product_model_product_description_culture_df, rename_mappings['product_model_product_description_culture_df'])
product_photo_df = rename_columns(product_photo_df, rename_mappings['product_photo_df'])
product_product_photo_df = rename_columns(product_product_photo_df, rename_mappings['product_product_photo_df'])
product_review_df = rename_columns(product_review_df, rename_mappings['product_review_df'])
product_subcategory_df = rename_columns(product_subcategory_df, rename_mappings['product_subcategory_df'])
unit_measure_df = rename_columns(unit_measure_df, rename_mappings['unit_measure_df'])

In [6]:
from pyspark.sql.functions import col

# Joining product_df with product_subcategory_df first
product_details_df = product_df.join(product_subcategory_df, "ProductSubcategoryID", "left")
product_details_df = product_details_df.join(product_category_df, "ProductCategoryID", "left")
product_details_df = product_details_df.join(product_model_df, "ProductModelID", "left")

print(len(product_details_df.columns))

# Join the unit_measure_df for size_measure
product_details_df = product_details_df.join(
    unit_measure_df.alias("size_measure"),
    product_details_df["SizeUnitMeasureCode"] == col("size_measure.UnitMeasureCode"),
    "left"
).select(
    product_details_df["*"],  # Select all original columns
    col("size_measure.UnitMeasureCode").alias("Size_UnitMeasureCode"),
    col("size_measure.UnitMeasureName").alias("Size_UnitMeasureName"),
    col("size_measure.UnitMeasureModifiedDate").alias("Size_UnitMeasureModifiedDate")
)

# Join the unit_measure_df for weight_measure
product_details_df = product_details_df.join(
    unit_measure_df.alias("weight_measure"),
    product_details_df["WeightUnitMeasureCode"] == col("weight_measure.UnitMeasureCode"),
    "left"
).select(
    product_details_df["*"],  # Select all columns including renamed size_measure columns
    col("weight_measure.UnitMeasureCode").alias("Weight_UnitMeasureCode"),
    col("weight_measure.UnitMeasureName").alias("Weight_UnitMeasureName"),
    col("weight_measure.UnitMeasureModifiedDate").alias("Weight_UnitMeasureModifiedDate")
)

print(len(product_details_df.columns))

# Further joins
product_details_df = product_details_df.join(product_model_illustration_df, "ProductModelID", "left")
product_details_df = product_details_df.join(illustration_df, "IllustrationID", "left")
product_details_df = product_details_df.join(product_model_product_description_culture_df, "ProductModelID", "left")
product_details_df = product_details_df.join(product_description_df, "ProductDescriptionID", "left")
product_details_df = product_details_df.join(culture_df, "CultureID", "left")

print(len(product_details_df.columns))

# Joining product with documents, photos, reviews
product_details_df = product_details_df.join(product_document_df, "ProductID", "left")
product_details_df = product_details_df.join(document_df, "DocumentNode", "left")
product_details_df = product_details_df.join(product_product_photo_df, "ProductID", "left")
product_details_df = product_details_df.join(product_photo_df, "ProductPhotoID", "left")
product_details_df = product_details_df.join(product_review_df, "ProductID", "left")

print(len(product_details_df.columns))

37
43
55
82


In [7]:
# save to csv file
output_path = "../denormalized-files/production.csv"

# Convert Spark DataFrame to pandas DataFrame
product_details_pd_df = product_details_df.toPandas()

# Save to CSV using pandas, ensuring it's a single file
product_details_pd_df.to_csv(output_path, index=False, header=True)

24/06/24 13:52:09 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


CodeCache: size=131072Kb used=34650Kb max_used=34650Kb free=96422Kb
 bounds [0x00000001069e8000, 0x0000000108bf8000, 0x000000010e9e8000]
 total_blobs=12283 nmethods=11362 adapters=834
 compilation: disabled (not enough contiguous free space left)
