In [3]:
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col, expr, desc, sqrt, mean, stddev, percentile_approx, last, coalesce
from pyspark.sql.types import DoubleType, DateType
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import os

In [4]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("NetflixStockPreprocessing") \
    .getOrCreate()

KeyboardInterrupt: 

In [None]:
# Create directory for plots if it doesn't exist
os.makedirs('distribution_plots', exist_ok=True)

In [None]:
# Function for detailed outlier analysis
def comprehensive_outlier_detection(spark_df, numeric_columns):
    outlier_results = {}

OUTLIER

In [None]:
# Function for detailed outlier analysis
def comprehensive_outlier_detection(spark_df, numeric_columns):
    outlier_results = {}
    
    # Convert to pandas for visualization
    pandas_df = spark_df.toPandas()
    
    # Create a figure for boxplots
    plt.figure(figsize=(15, 6))
    
    # Analyze each numeric column
    for idx, column in enumerate(numeric_columns, 1):
        # Calculate statistics using Spark
        stats = spark_df.select(
            mean(col(column)).alias('mean'),
            stddev(col(column)).alias('stddev'),
            expr(f'percentile_approx({column}, array(0.25, 0.75), 10000)').alias('quartiles')
        ).collect()[0]
        
        # Extract values
        q1 = float(stats['quartiles'][0])
        q3 = float(stats['quartiles'][1])
        iqr = q3 - q1
        
        # Define bounds
        lower_bound = q1 - 1.5 * iqr
        upper_bound = q3 + 1.5 * iqr
        
        # Count outliers using Spark
        outliers_count = spark_df.filter(
            (col(column) < lower_bound) | (col(column) > upper_bound)
        ).count()
        
        # Get outlier rows
        outliers = spark_df.filter(
            (col(column) < lower_bound) | (col(column) > upper_bound)
        ).toPandas()
        
        # Store results
        outlier_results[column] = {
            'total_outliers': outliers_count,
            'percentage_outliers': (outliers_count / spark_df.count()) * 100,
            'lower_bound': lower_bound,
            'upper_bound': upper_bound,
            'outliers': outliers
        }
        
        # Print detailed outlier information
        print(f"\nOutlier Analysis for {column}:")
        print(f"Total Outliers: {outliers_count}")
        print(f"Percentage of Outliers: {(outliers_count / spark_df.count()) * 100:.2f}%")
        print(f"Lower Bound: {lower_bound}")
        print(f"Upper Bound: {upper_bound}")
        
        # If there are outliers, print them
        if outliers_count > 0:
            print("\nOutlier Details:")
            print(outliers)
        
        # Create boxplot
        plt.subplot(1, len(numeric_columns), idx)
        sns.boxplot(x=pandas_df[column])
        plt.title(f'Boxplot of {column}')
    
    # Save boxplot
    plt.tight_layout()
    plt.savefig('outliers_boxplot0.png')
    plt.close()
    
    return outlier_results

1. DATA LOADING AND OVERVIEW

In [None]:
# Read the CSV file
df = spark.read.csv('NFLX.csv', header=True, inferSchema=True)

In [None]:
# Display basic information
print(f"Number of Rows: {df.count()}")
print(f"Number of Columns: {len(df.columns)}")
print("\nColumn Data Types:")
print("\n".join([f"{field.name}: {field.dataType}" for field in df.schema.fields]))

In [None]:
# OUTLIER DETECTION
numeric_columns = ["Open", "High", "Low", "Close", "Adj Close", "Volume"]
print("\nPerforming Outlier Detection...")
outlier_analysis = comprehensive_outlier_detection(df, numeric_columns)

2. DATA QUALITY ASSESSMENT

In [None]:
# Check for missing values
def check_missing_values(df):
    total_count = df.count()
    missing_counts = []
    for column in df.columns:
        missing_count = df.filter(col(column).isNull()).count()
        missing_counts.append((column, missing_count, (missing_count/total_count)*100))
    
    for column, count, percentage in missing_counts:
        print(f"{column}: {count} missing values ({percentage:.2f}%)")

print("\nMissing Values:")
check_missing_values(df)

In [None]:
# Check for duplicates
duplicate_count = df.count() - df.dropDuplicates().count()
print(f"\nNumber of Duplicate Records: {duplicate_count}")

3. DATA PREPROCESSING

In [None]:
# Convert Date column to proper date type
df = df.withColumn("Date", col("Date").cast(DateType()))

In [None]:
# Convert string columns to double where appropriate
for column in numeric_columns:
    df = df.withColumn(column, col(column).cast(DoubleType()))

In [None]:
# Handle missing values with interpolation
# Note: In Spark, we'll use forward fill as a simple alternative to interpolation
window_spec = Window.orderBy("Date")
for column in numeric_columns:
    df = df.withColumn(column, 
                      coalesce(col(column), 
                              last(col(column), True).over(window_spec)))

4. DESCRIPTIVE STATISTICS

In [None]:
# Calculate summary statistics
summary_stats = df.select([
    mean(col(c)).alias(f"{c}_mean"),
    stddev(col(c)).alias(f"{c}_stddev"),
    min(col(c)).alias(f"{c}_min"),
    max(col(c)).alias(f"{c}_max")
] for c in numeric_columns).toPandas()

print("\nDescriptive Statistics:")
print(summary_stats)

5. DISTRIBUTION ANALYSIS

In [None]:
# Convert to pandas for visualization
pandas_df = df.toPandas()

In [None]:
# Create distribution plots
plt.figure(figsize=(15, 10))
for i, col in enumerate(numeric_columns, 1):
    plt.subplot(2, 3, i)
    sns.histplot(data=pandas_df, x=col, kde=True)
    plt.title(f'{col} Distribution')
plt.tight_layout()
plt.savefig('distribution_plots1/histograms1.png')
plt.close()

In [None]:
# Create box plots
plt.figure(figsize=(15, 5))
pandas_df[numeric_columns].boxplot()
plt.title('Box Plot of Numeric Columns')
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig('distribution_plots2/boxplots2.png')
plt.close()

6. RELATIONSHIP EXPLORATION

In [None]:
# Calculate correlations
correlations = []
for col1 in numeric_columns:
    for col2 in numeric_columns:
        correlation = df.stat.corr(col1, col2)
        correlations.append((col1, col2, correlation))

In [None]:
# Convert correlations to a pandas DataFrame for visualization
correlation_df = pd.DataFrame(correlations, columns=['Column1', 'Column2', 'Correlation'])
correlation_matrix = correlation_df.pivot(index='Column1', columns='Column2', values='Correlation')

In [None]:
# Create correlation heatmap
plt.figure(figsize=(10, 8))
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0)
plt.title('Correlation Heatmap')
plt.tight_layout()
plt.savefig('distribution_plots3/correlation_heatmap3.png')
plt.close()

In [None]:
# Save processed data
df.write.mode("overwrite").csv("preprocessed_spark_data.csv", header=True)
print("\nPreprocessed data saved to 'preprocessed_spark_data.csv'")

In [None]:
# Stop Spark session
spark.stop()