In [6]:
import json
import pandas as pd

In [7]:
products = [
    {
  "product_id": "12345",
  "product_name": "Widget A",
  "price": 19.99,
  "description": "A high-quality widget.",
  "manufacturer": "Company X",
  "stock_quantity": None
},

{
  "SKU": "SKU-54321",
  "product_name": "SuperWidget",
  "price": 24.95,
  "description": "The ultimate super widget.",
  "manufacturer": "Brand Y",
  "inventory": 150
},

{
  "product_id": "12345",
  "product_name": "Widget A",
  "price": 19.99,
  "description": "A high-quality widget.",
  "manufacturer": "Company X",
  "stock_quantity": 75
}

]

for i, product in enumerate(products):
    filename = f"product_{i+1}.json"
    with open(filename, 'w') as outfile:
        json.dump(product, outfile)

In [8]:
def standardize_data(product_data):
    # Ensure consistent keys and handle missing values
    standardized_data = {
        'product_id': product_data.get('product_id', product_data.get('SKU', '')),
        'product_name': product_data.get('product_name', 'Unknown Product'),
        'price': product_data.get('price', 0),
        'description': product_data.get('description', 'No description available'),
        'manufacturer': product_data.get('manufacturer', 'Unknown Manufacturer'),
        'stock_quantity': product_data.get('stock_quantity', product_data.get('inventory', 0))
    }
    return standardized_data

In [10]:
import os
directory = '.'  # The directory where the JSON files are stored
all_files = os.listdir(directory)

# Filter the files to include only JSON files with a standardized name pattern
json_files = [file for file in all_files if file.startswith('product_') and file.endswith('.json')]

# Print the list of JSON files
print(json_files)

['product_1.json', 'product_2.json', 'product_3.json']


In [11]:
data_list = []
for filename in json_files:
    with open(filename) as f:
        data_list.append(json.load(f))

standardized_data = [standardize_data(item) for item in data_list]
df = pd.DataFrame(standardized_data)
df

Unnamed: 0,product_id,product_name,price,description,manufacturer,stock_quantity
0,12345,Widget A,19.99,A high-quality widget.,Company X,
1,SKU-54321,SuperWidget,24.95,The ultimate super widget.,Brand Y,150.0
2,12345,Widget A,19.99,A high-quality widget.,Company X,75.0


In [13]:
aggregated_df = df.groupby('product_id').agg({
    'product_name': 'first',  # Take the first value for product_name
    'price': 'mean',  # Calculate the mean price
    'description': 'first',  # Take the first value for description
    'manufacturer': 'first',  # Take the first value for manufacturer
    'stock_quantity': 'sum'  # Sum the stock_quantity values
}).reset_index()

aggregated_df

Unnamed: 0,product_id,product_name,price,description,manufacturer,stock_quantity
0,12345,Widget A,19.99,A high-quality widget.,Company X,75.0
1,SKU-54321,SuperWidget,24.95,The ultimate super widget.,Brand Y,150.0


In [14]:
df

Unnamed: 0,product_id,product_name,price,description,manufacturer,stock_quantity
0,12345,Widget A,19.99,A high-quality widget.,Company X,
1,SKU-54321,SuperWidget,24.95,The ultimate super widget.,Brand Y,150.0
2,12345,Widget A,19.99,A high-quality widget.,Company X,75.0


In [15]:
# Store the consolidated data in a central data repository or database.
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .master("local[1]") \
    .appName("ConsolidatedDataStorage") \
    .getOrCreate()


In [16]:
spark

In [17]:
spark_df = spark.createDataFrame(aggregated_df)

ModuleNotFoundError: No module named 'distutils'