In [9]:
import psycopg2
from psycopg2 import OperationalError
import pandas as pd

# Define your database credentials
db_config = {
    "dbname": "walmart",
    "user": "postgres", 
    "password": "123456",
    "host": "localhost",  # e.g., 'localhost' or '127.0.0.1'
    "port": "5432"   # default PostgreSQL port is 5432
}

# Initialize cursor and connection variables
connection = None
cursor = None

def print_error_details(error):
    print(f"Error type: {type(error).__name__}")
    print(f"Error details: {error}")

# Connect to the PostgreSQL database
try:
    connection = psycopg2.connect(**db_config)
    cursor = connection.cursor()

    # Define the query to get data from the module table
    query = "select * from grocery_sales;"

    # Execute the query
    cursor.execute(query)

    # Fetch all rows from the executed query
    data = cursor.fetchall()
    colnames = [desc[0] for desc in cursor.description]

    # Convert to DataFrame
    grocery_sales= pd.DataFrame(data, columns=colnames)



except OperationalError as op_err:
    print("OperationalError: Could not connect to the PostgreSQL database.")
    print_error_details(op_err)

except Exception as error:
    print("An error occurred while connecting to the PostgreSQL database.")
    print_error_details(error)

finally:
    # Close the cursor and connection
    if cursor:
        cursor.close()
    if connection:
        connection.close()


In [10]:
grocery_sales.head()

Unnamed: 0,index,Store_ID,Date,Dept,Weekly_Sales
0,0,1,2010-02-05,1,24924.5
1,1,1,2010-02-05,26,11737.12
2,2,1,2010-02-05,17,13223.76
3,3,1,2010-02-05,45,37.44
4,4,1,2010-02-05,28,1085.29


In [11]:
import pandas as pd

# Path to the Parquet file
parquet_file_path = 'extra_data.parquet'

# Read the Parquet file into a DataFrame
extra_data = pd.read_parquet(parquet_file_path)




In [12]:
extra_data.head()

Unnamed: 0,index,IsHoliday,Temperature,Fuel_Price,MarkDown1,MarkDown2,MarkDown3,MarkDown4,MarkDown5,CPI,Unemployment,Type,Size
0,0,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,8.106,3.0,151315.0
1,1,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,8.106,3.0,151315.0
2,2,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,8.106,3.0,151315.0
3,3,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,,3.0,151315.0
4,4,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,,3.0,151315.0


In [13]:
def extract(df1,df2):
  merged_df = pd.merge(df1, df2, on='index')
  return merged_df

merged_df=extract(grocery_sales,extra_data)

In [14]:
merged_df.head()

Unnamed: 0,index,Store_ID,Date,Dept,Weekly_Sales,IsHoliday,Temperature,Fuel_Price,MarkDown1,MarkDown2,MarkDown3,MarkDown4,MarkDown5,CPI,Unemployment,Type,Size
0,0,1,2010-02-05,1,24924.5,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,8.106,3.0,151315.0
1,1,1,2010-02-05,26,11737.12,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,8.106,3.0,151315.0
2,2,1,2010-02-05,17,13223.76,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,8.106,3.0,151315.0
3,3,1,2010-02-05,45,37.44,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,,3.0,151315.0
4,4,1,2010-02-05,28,1085.29,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,,3.0,151315.0


In [17]:
merged_df.isna().any()

index           False
Store_ID        False
Date             True
Dept            False
Weekly_Sales     True
IsHoliday       False
Temperature     False
Fuel_Price      False
MarkDown1       False
MarkDown2       False
MarkDown3       False
MarkDown4       False
MarkDown5       False
CPI              True
Unemployment     True
Type            False
Size            False
dtype: bool

In [34]:
# Convert 'Date' column to datetime
merged_df['Date'] = pd.to_datetime(merged_df['Date'], errors='coerce')

In [54]:
def transform(df):
    # Fill NaN values in 'Date' column with the minimum date from the same column
    df['Date'].fillna(df['Date'].min(), inplace=True)
    df['Weekly_Sales'].fillna(0, inplace=True)
    df['CPI'].fillna(0, inplace=True)
    df['Unemployment'].fillna(0, inplace=True)
    # Extract the month from the 'Date' column and create a new column 'Month'
    df["Month"]=df['Date'].dt.month
    # Filter rows where 'Weekly_Sales' is greater than 10,000
    df=df[df['Weekly_Sales']>10000]
    # Drop unnecessary columns, keep  `"Store_ID"`- `"Month"``"Dept"` "IsHoliday", "Weekly_Sales"` "CPI",` Unemployment"`"
    clean_data = df[["Store_ID","Month","Dept","IsHoliday","Weekly_Sales","CPI","Unemployment"]]
    
    return clean_data

clean_data=transform(merged_df)    
    

In [55]:
clean_data

Unnamed: 0,Store_ID,Month,Dept,IsHoliday,Weekly_Sales,CPI,Unemployment
0,1,2,1,0,24924.5,211.096358,8.106
1,1,2,26,0,11737.12,211.096358,8.106
2,1,2,17,0,13223.76,211.096358,8.106
5,1,2,79,0,46729.77,211.096358,0.000
6,1,2,55,0,21249.31,211.096358,0.000
...,...,...,...,...,...,...,...
19994,2,9,10,0,39335.53,222.217440,6.565
19995,2,9,11,0,16955.5,222.217440,6.565
19996,2,9,80,0,25811.35,222.217440,6.565
19997,2,9,74,0,14724.8,222.217440,6.565


In [66]:
def avg_monthly_sales(clean_data):
    #
    agg_data=clean_data.groupby("Month")["Weekly_Sales"].mean().reset_index()
    agg_data.columns=["Month","Avg_Sales"]
    # Ensure there are no NaN values in 'Avg_Sales' before rounding
    agg_data['Avg_Sales'] = agg_data['Avg_Sales'].fillna(0)
    agg_data["Avg_Sales"]=agg_data["Avg_Sales"].round(2)

    return agg_data
agg_data=avg_monthly_sales(clean_data)
agg_data.head()

Unnamed: 0,Month,Avg_Sales
0,1,40001.26
1,2,40749.51
2,3,39790.23
3,4,40484.66
4,5,40077.05


In [76]:
import pandas as pd

def load(clean_data, agg_data, clean_data_path, agg_data_path):

    
    # Save the cleaned DataFrame to CSV without an index
    clean_data.to_csv(clean_data_path, index=False)
    
    # Save the aggregated DataFrame to CSV without an index
    agg_data.to_csv(agg_data_path, index=False)

# Example usage:
# Assuming `clean_data` and `agg_data` are your DataFrames
clean_data_path = 'clean_data.csv'
agg_data_path = 'agg_data.csv'

# Call the load function to save the DataFrames
load(clean_data, agg_data, clean_data_path, agg_data_path)


In [78]:
import os

def validation(file_path):
    
    return os.path.isfile(file_path)
    
    

clean_data_path = 'clean_data.csv'
agg_data_path = 'agg_data.csv'

# Validate the existence of each CSV file
clean_data_exists = validation(clean_data_path)
agg_data_exists = validation(agg_data_path)

# Print the validation result
if clean_data_exists and agg_data_exists:
    print("Both files exist.")
else:
    print("One or both files do not exist.")


Both files exist.
