### Imporing necessary libraries

In [24]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import re
# Install the psycopg2 package
%pip install psycopg2-binary
import psycopg2


Note: you may need to restart the kernel to use updated packages.


### Saving the files to separate variables

In [25]:
base_path = r'F:\GUVI\Projects\P2 -Data Spark'
sales = pd.read_csv(os.path.join(base_path, 'sales.csv'))
customers = pd.read_csv(os.path.join(base_path, 'customers.csv'), encoding='ISO 8859-1')
products = pd.read_csv(os.path.join(base_path, 'products.csv'))
exchange_rates = pd.read_csv(os.path.join(base_path, 'exchange_rates.csv'))
stores = pd.read_csv(os.path.join(base_path, 'stores.csv'))

### Identifying the columns in each dataframes

In [26]:
# Apply the operation on column names for each DataFrame in the dictionary
dataframes = {
    'sales': sales,
    'customers': customers,
    'products': products,
    'exchange_rates': exchange_rates,
    'stores': stores
}
for name, df in dataframes.items():
    if isinstance(df, pd.DataFrame):  # Ensure it's a DataFrame
        # Perform column name transformation: lowercase and replace spaces with underscores
        df.columns = df.columns.str.lower().str.replace(' ', '_')
        print(f"Columns for '{name}' after transformation:")
        print(df.columns, "\n")


Columns for 'sales' after transformation:
Index(['order_number', 'line_item', 'order_date', 'delivery_date',
       'customerkey', 'storekey', 'productkey', 'quantity', 'currency_code'],
      dtype='object') 

Columns for 'customers' after transformation:
Index(['customerkey', 'gender', 'name', 'city', 'state_code', 'state',
       'zip_code', 'country', 'continent', 'birthday'],
      dtype='object') 

Columns for 'products' after transformation:
Index(['productkey', 'product_name', 'brand', 'color', 'unit_cost_usd',
       'unit_price_usd', 'subcategorykey', 'subcategory', 'categorykey',
       'category'],
      dtype='object') 

Columns for 'exchange_rates' after transformation:
Index(['date', 'currency', 'exchange'], dtype='object') 

Columns for 'stores' after transformation:
Index(['storekey', 'country', 'state', 'square_meters', 'open_date'], dtype='object') 



### Checking for null values & data type in each df

In [38]:
# List of dataframes to check for null values and data types
dataframes = {
    'sales': sales,
    'customers': customers,
    'products': products,
    'exchange_rates': exchange_rates,
    'stores': stores
}

for name, df in dataframes.items():
    print(f"--- {name.upper()} ---")
    print(f"Null values per column:\n{df.isnull().sum()}")
    print("\nData types:")
    print(df.dtypes)
    print("\n")


--- SALES ---
Null values per column:
order_number     0
line_item        0
order_date       0
delivery_date    0
customerkey      0
storekey         0
productkey       0
quantity         0
currency_code    0
dtype: int64

Data types:
order_number      int64
line_item         int64
order_date       object
delivery_date    object
customerkey       int64
storekey          int64
productkey        int64
quantity          int64
currency_code    object
dtype: object


--- CUSTOMERS ---
Null values per column:
customerkey     0
gender          0
name            0
city            0
state_code     10
state           0
zip_code        0
country         0
continent       0
birthday        0
dtype: int64

Data types:
customerkey     int64
gender         object
name           object
city           object
state_code     object
state          object
zip_code       object
country        object
continent      object
birthday       object
dtype: object


--- PRODUCTS ---
Null values per column:
productk

### Checking how many rows and columns present in each df

In [28]:
# Loop through each dataframe to find the number of columns, rows, and column names
for name, df in dataframes.items():
    rows, columns = df.shape  
    column_names = df.columns.tolist()  
    print(f"--- {name.upper()} ---")
    print(f"Number of rows: {rows}")
    print(f"Number of columns: {columns}")
    print(f"Column names: {', '.join(column_names)}\n")  

--- SALES ---
Number of rows: 62884
Number of columns: 9
Column names: order_number, line_item, order_date, delivery_date, customerkey, storekey, productkey, quantity, currency_code

--- CUSTOMERS ---
Number of rows: 15266
Number of columns: 10
Column names: customerkey, gender, name, city, state_code, state, zip_code, country, continent, birthday

--- PRODUCTS ---
Number of rows: 2517
Number of columns: 10
Column names: productkey, product_name, brand, color, unit_cost_usd, unit_price_usd, subcategorykey, subcategory, categorykey, category

--- EXCHANGE_RATES ---
Number of rows: 11215
Number of columns: 3
Column names: date, currency, exchange

--- STORES ---
Number of rows: 67
Number of columns: 5
Column names: storekey, country, state, square_meters, open_date



#### Changing date time and data type formats

In [31]:
sales['order_date'] = pd.to_datetime(sales['order_date'], format='%m/%d/%Y')
sales['order_date'] = sales['order_date'].dt.date
sales['delivery_date'] = pd.to_datetime(sales['delivery_date'], format='%m/%d/%Y', errors='coerce')
sales['delivery_date'] = sales['delivery_date'].dt.date
sales.loc[:, 'currency_code'] = sales['currency_code'].astype(str)
sales['delivery_date'] = sales['delivery_date'].fillna(0)


In [32]:
stores['country'] = stores['country'].astype(str)
stores['state'] = stores['state'].astype(str)
stores['open_date'] = pd.to_datetime(stores['open_date'], format='%m/%d/%Y', errors='coerce')
stores['open_date'] = stores['open_date'].dt.date
stores['square_meters'] = stores['square_meters'].fillna(stores['square_meters'].mean())

In [34]:
customers['birthday'] = pd.to_datetime(customers['birthday'], format='%m/%d/%Y', errors='coerce')
customers['birthday'] = customers['birthday'].dt.date

In [35]:
exchange_rates['date'] = pd.to_datetime(exchange_rates['date'], format='%m/%d/%Y', errors='coerce')
exchange_rates['date'] = exchange_rates['date'].dt.date

##### Removing $ from respective columns

In [33]:
# Remove dollar signs and commas, then convert to float
products['unit_cost_usd'] = products['unit_cost_usd'].replace(r'[\$,]', '', regex=True).astype(float)
products['unit_price_usd'] = products['unit_price_usd'].replace(r'[\$,]', '', regex=True).astype(float)


#### Dropping duplicates present in each column if any

In [36]:
sales.drop_duplicates(inplace=True)
exchange_rates.drop_duplicates(inplace=True)
customers.drop_duplicates(inplace=True)
products.drop_duplicates(inplace=True)
stores.drop_duplicates(inplace=True)

#### Create connection in mysql and creating database - Dataspark

In [39]:
import pymysql

connection = pymysql.connect(host='localhost', user='root', password='root', port=3306)

try:
    cursor = connection.cursor()

    database_name = "DataSpark"
    cursor.execute(f"CREATE DATABASE {database_name}")
    print(f"Database {database_name} created successfully")

except pymysql.MySQLError as error:
    print(f"Error occurred: {error}")

finally:
    cursor.close()
    connection.close()


Database DataSpark created successfully


#### Importing the cleaned dfs into mysql database - dataspark

In [41]:
from sqlalchemy import create_engine

host='localhost'
user='root'
password='root'
port=3306
database_name = "DataSpark"

engine_string = f"mysql+pymysql://{user}:{password}@{host}:{port}/{database_name}"
engine = create_engine(engine_string)


table_name ="customers"
customers.to_sql(table_name, engine, if_exists='replace', index=False)
print(f"successfully imported {table_name} to sql")

table_name = "products"
products.to_sql(table_name, engine, if_exists='replace', index=False)
print(f"successfully imported {table_name} to sql")

table_name = "exchange_rates"
exchange_rates.to_sql(table_name, engine, if_exists='replace', index=False)
print(f"successfully imported {table_name} to sql")

table_name = "stores"
stores.to_sql(table_name, engine, if_exists='replace', index=False)
print(f"successfully imported {table_name} to sql")

table_name = "sales"
sales.to_sql(table_name, engine, if_exists='replace', index=False)
print(f"successfully imported {table_name} to sql")


successfully imported customers to sql
successfully imported products to sql
successfully imported exchange_rates to sql
successfully imported stores to sql
successfully imported sales to sql
