In [None]:
from pyspark.sql import SparkSession
import os
from pyspark.sql.functions import col, regexp_replace, when, expr
import pandas as pd

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Transform and Combine Partitioned Files") \
    .getOrCreate()

# HDFS paths
input_directory = "hdfs://namenode:9000/data"

# List of file names (file1.csv, file2.csv, ..., fileN.csv)
num_files = 1  # Change this to the actual number of files
file_names = ["sample.csv"]


# Iterate over each file and transform
for file_name in file_names:
    # Construct the full file path
    file_path = os.path.join(input_directory, file_name)
    
    # Load the file into a DataFrame
    df = spark.read.csv(file_path, header=True, inferSchema=True)

    


data = df.toPandas()

In [None]:
# data['Age'] = pd.to_numeric(data['Age'], errors='coerce')
# data['Quantity'] = pd.to_numeric(data['Quantity'], errors='coerce')
# data['TotalAmount'] = pd.to_numeric(data['TotalAmount'], errors='coerce')
# data['Price'] = pd.to_numeric(data['Price'], errors='coerce')
# data['Country'] = data['Country'].astype(str)
# data['CustomerID'] = data['CustomerID'].astype(int)
# data['Name'] = data['Name'].astype(str)
# data['RegistrationDate'] = pd.to_datetime(data['RegistrationDate'], errors='coerce')
# data['OrderDate'] = pd.to_datetime(data['OrderDate'], errors='coerce')
# data['Category'] = data['Category'].astype(str)
# data['ProductID'] = data['ProductID'].astype(int)
# data['ProductName'] = data['ProductName'].astype(str)
# data['ShippingAddress'] = data['ShippingAddress'].astype(str)
# data['ShippingDate'] = pd.to_datetime(data['ShippingDate'], errors='coerce')

data['Age'] = data['Age'].astype(int)  # Convert to numeric (float)
data['Quantity'] = data['Quantity'].astype(int)  # Convert to numeric (float)
data['TotalAmount'] = data['TotalAmount'].astype(float)  # Convert to numeric (float)
data['Price'] = data['Price'].astype(float)  # Convert to numeric (float)
data['Country'] = data['Country'].astype(str)  # Convert to string
data['CustomerID'] = data['CustomerID'].astype(int)  # Convert to integer
data['Name'] = data['Name'].astype(str)  # Convert to string
data['RegistrationDate'] = data['RegistrationDate'].astype('datetime64[ns]')  # Convert to datetime
data['OrderDate'] = data['OrderDate'].astype('datetime64[ns]')  # Convert to datetime
data['Category'] = data['Category'].astype(str)  # Convert to string
data['ProductID'] = data['ProductID'].astype(int)  # Convert to integer
data['ProductName'] = data['ProductName'].astype(str)  # Convert to string
data['ShippingAddress'] = data['ShippingAddress'].astype(str)  # Convert to string
data['ShippingDate'] = data['ShippingDate'].astype('datetime64[ns]')  # Convert to datetime



In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

print('start')

# Convert PySpark DataFrame to pandas DataFrame


# Identify numerical columns
numerical_cols = data.select_dtypes(include=['int64', 'float64']).columns

# Set up the figure and axes
fig, axs = plt.subplots(nrows=len(numerical_cols), ncols=3, figsize=(18, len(numerical_cols) * 5))

for i, col in enumerate(numerical_cols):
    # Boxplot
    axs[i, 0].boxplot(data[col].dropna())
    axs[i, 0].set_title(f'Boxplot of {col}')
    axs[i, 0].set_ylabel(col)
    
    # Histogram
    axs[i, 1].hist(data[col].dropna(), bins=30, edgecolor='black')
    axs[i, 1].set_title(f'Histogram of {col}')
    axs[i, 1].set_ylabel('Frequency')

    # Density plot
    sns.kdeplot(data[col].dropna(), ax=axs[i, 2], fill=True)
    axs[i, 2].set_title(f'Density Plot of {col}')
    axs[i, 2].set_ylabel('Density')

# Adjust layout
plt.tight_layout()
plt.show()


In [None]:
import pandas as pd

# Select numeric columns
numeric_columns = ['Quantity', 'Age', 'Price', 'TotalAmount']

# Initialize dictionaries to store bounds and outlier counts
outlier_bounds = {}
outlier_counts = {}

# Calculate Q1, Q2, Q3, and outliers for each numeric column
for col in numeric_columns:
    Q1 = data[col].quantile(0.25)
    Q2 = data[col].quantile(0.50)
    Q3 = data[col].quantile(0.75)
    
    # Calculate IQR
    IQR = Q3 - Q1
    
    # Define lower and upper bounds for outliers
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    outlier_bounds[col] = (lower_bound, upper_bound)
    
    # Count the number of outliers
    outlier_count = data[(data[col] < lower_bound) | (data[col] > upper_bound)].shape[0]
    outlier_counts[col] = outlier_count

# Show outlier bounds
for col, bounds in outlier_bounds.items():
    print(f"{col}: Lower Bound = {bounds[0]}, Upper Bound = {bounds[1]}")

# Show count of outliers for each numeric column
for col, count in outlier_counts.items():
    print(f"{col}: Outlier Count = {count}")

In [None]:
print('start')
def check_missing_data(df):
    missing_summary = df.isnull().sum()
    missing_percentage = (missing_summary / len(df)) * 100

    print("Missing Values Summary:")
    print(pd.DataFrame({
        'Missing Count': missing_summary,
        'Percentage': missing_percentage
    }))
    
    return missing_summary, missing_percentage

missing_summary, missing_percentage = check_missing_data(data)
print(missing_summary)

In [None]:
print('start')
selected_cols = [
    'Quantity', 
    'Age', 
    'Price', 
    'TotalAmount', 
]

correlation_matrix = data[selected_cols].corr()

plt.figure(figsize=(5, 5))
sns.heatmap(correlation_matrix, annot=True, fmt=".2f", cmap='coolwarm', 
            vmin=-1, vmax=1, square=True, linewidths=.5)
plt.title('Correlation Heatmap')
plt.show()

# Display the correlation table
print("Correlation Table:")
print(correlation_matrix)

In [None]:
distinct_product_names = df.select("ProductName").distinct().collect()
distinct_countries = df.select("Country").distinct().collect()

# Print the distinct values
print("Distinct Product Names:")
for row in distinct_product_names:
    print(row["ProductName"])

print("\nDistinct Countries:")
for row in distinct_countries:
    print(row["Country"])

In [None]:
headers = df.columns
print("Headers of the DataFrame:", headers)