In [None]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("CRMClassifier-DataIngestion").getOrCreate()

data_path = "../data/crm_sample.csv"
if not os.path.exists("../data"):
    os.makedirs("../data")

if not os.path.exists(data_path):
    import pandas as pd
    df = pd.DataFrame({
        "taxpayer_id": range(1, 11),
        "income": [50000, 120000, 70000, 45000, 300000, 150000, 80000, 40000, 95000, 60000],
        "transactions": [5, 50, 20, 3, 120, 60, 15, 2, 35, 10],
        "region": ["A", "B", "A", "C", "B", "B", "C", "A", "C", "A"],
        "risk": ["Low", "High", "Medium", "Low", "High", "High", "Medium", "Low", "Medium", "Low"]
    })
    df.to_csv(data_path, index=False)

df_spark = spark.read.csv(data_path, header=True, inferSchema=True)
df_spark.printSchema()
df_spark.show(5)
print("Total records:", df_spark.count())
df_spark.groupBy("risk").count().show()
df_spark.filter(col("income") > 100000).show()

checkpoint_path = "../data/crm_ingested.parquet"
df_spark.write.mode("overwrite").parquet(checkpoint_path)
print("✅ Data saved at", checkpoint_path)