In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, abs, round as spark_round
from pyspark.sql.types import DoubleType

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Crypto Data Cleaning") \
    .getOrCreate()

# File paths
INPUT_CSV = 'crypto_tradinds.csv'
OUTPUT_CSV = 'raw_crypto.csv'

# Columns to remove
columns_to_remove = ['max_supply', 'crypto_type', 'price_btc', 'BTC_price_change_1_day']

# Critical columns
critical_columns = ['trade_date', 'price_usd', 'crypto_name']

# Numeric columns to check
numeric_cols = ['volume', 'price_usd', 'market_cap',
                'capitalization_change_1_day', 'usd_price_change_1_day']

# Data-related fields
data_cols = ['platform_name', 'industry_name']

# Read CSV
df = spark.read.option("header", True).csv(INPUT_CSV)

# Drop unnecessary columns
for col_name in columns_to_remove:
    if col_name in df.columns:
        df = df.drop(col_name)

# Drop rows with missing critical fields
df = df.dropna(subset=[col for col in critical_columns if col in df.columns])

# Cast numeric columns to Double
for col_name in numeric_cols:
    if col_name in df.columns:
        df = df.withColumn(col_name, col(col_name).cast(DoubleType()))
    else:
        print(f"Skipping missing numeric column: {col_name}")

# Handle 'minable' column
if 'minable' in df.columns:
    df = df.withColumn('minable', when(col('minable') == 1, True).when(col('minable') == 0, False))

# Drop rows with missing numeric values
existing_numeric = [col_name for col_name in numeric_cols if col_name in df.columns]
df = df.dropna(subset=existing_numeric)

# Drop missing platform_name and industry_name rows
df = df.dropna(subset=data_cols)

# Derived Columns
df = df.withColumn('price_change_percentage_usd',(col('usd_price_change_1_day') / (col('price_usd') - col('usd_price_change_1_day'))) * 100)

df = df.withColumn('volume_to_market_cap_ratio', col('volume') / col('market_cap'))

df = df.withColumn('price_volatility_score',abs(col('usd_price_change_1_day')) / col('price_usd'))

df = df.withColumn('is_price_up',when(col('usd_price_change_1_day') > 0, 'True').otherwise('False'))

df = df.withColumn('is_cap_up', when(col('capitalization_change_1_day') > 0, 'True').otherwise('False'))

df = df.withColumn('is_liquid_token',when(col('volume_to_market_cap_ratio') > 0.1, 'True').otherwise('False'))

df = df.withColumn('daily_return_score',(col('usd_price_change_1_day') / col('price_usd')) * 100)

# Round all float columns to 3 decimals
float_cols = [f.name for f in df.schema.fields if f.dataType.simpleString() in ['double', 'float']]
for col_name in float_cols:
    df = df.withColumn(col_name, spark_round(col(col_name), 3))

# Sort by crypto_name and trade_date
if 'crypto_name' in df.columns and 'trade_date' in df.columns:
    df = df.orderBy(['crypto_name', 'trade_date'])

# Save to CSV
df.write.mode("overwrite").option("header", True).csv(OUTPUT_CSV)

print(f"Cleaned file saved to folder: {OUTPUT_CSV}")

  df = pd.read_csv(INPUT_CSV)


Error while cleaning: local variable 'numeric_cols' referenced before assignment
