In [None]:
!pip install pyspark



In [None]:
#hello

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [None]:
# # ================================
# # 🔥 PYSPARK MEGA PRACTICE NOTEBOOK (Beginner → Intermediate)
# # One-cell, comment/uncomment based learning plan (7 Days)
# # Works best on Google Colab
# # ================================

# # ================================
# # 0) INSTALL & SETUP
# # ================================
# # If running on Colab, install pyspark (skip on Databricks or local where Spark is preinstalled)
# # !pip -q install pyspark==3.5.1

import os, io, json, math, random, pandas as pd, numpy as np
from datetime import datetime, date, timedelta

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.window import Window


# ==== Start Spark Session ====
spark = SparkSession.builder.appName("GitHub_PySpark").getOrCreate()

# ==== GitHub RAW URLs ====
base = "https://raw.githubusercontent.com/theabhinaykumar/csv/main/"

user_reviews   = base + "user_reviews.csv"
apps           = base + "apps.csv"
db1_sales      = base + "DB1%20sales.csv"
hospital       = base + "hospital.csv"
ab_nyc_2019    = base + "AB_NYC_2019.csv"
creditcard     = base + "creditcard.csv"
creditcard_1   = base + "creditcard%201.csv"

# ==== Load CSVs into Spark DataFrames ====
df_reviews   = spark.read.option("header", True).option("inferSchema", True).csv(user_reviews)
df_apps      = spark.read.option("header", True).option("inferSchema", True).csv(apps)
df_db1sales  = spark.read.option("header", True).option("inferSchema", True).csv(db1_sales)
df_hospital  = spark.read.option("header", True).option("inferSchema", True).csv(hospital)
df_abnyc     = spark.read.option("header", True).option("inferSchema", True).csv(ab_nyc_2019)
df_credit    = spark.read.option("header", True).option("inferSchema", True).csv(creditcard)
df_credit_1  = spark.read.option("header", True).option("inferSchema", True).csv(creditcard_1)

# ==== Check data ====
df_reviews.show(5)
df_apps.show(5)
df_hospital.printSchema()
df_abnyc.describe().show()
df_credit.count()


# spark = (SparkSession.builder
#          .appName("PySpark_Mega_Practice")
#          # .config("spark.sql.shuffle.partitions", "200")  # Tune if needed
#          .getOrCreate())

# print("✅ Spark version:", spark.version)
# sc = spark.sparkContext
# sc.setLogLevel("WARN")

# # OPTIONAL: checkpoint (useful for complex pipelines)
# spark.sparkContext.setCheckpointDir("/tmp/spark_checkpoints")

# # ================================
# # 1) DATA LOADING OPTIONS
# # ================================
# # Choose one: "github" (raw URL), "local_upload" (manual upload), "drive" (Google Drive path)
# SOURCE = "github"  # ← change to "local_upload" or "drive" when needed

# # GitHub RAW CSV (public dataset). You can replace with your own RAW CSV link.
# GITHUB_RAW_CSV_URL = "https://raw.githubusercontent.com/mwaskom/seaborn-data/master/tips.csv"  # small dataset

# # If using Google Drive, mount and set your CSV path (example):
# DRIVE_CSV_PATH = "/content/drive/MyDrive/data/myfile.csv"

# # ---- Helpers ----
# def load_from_github(url: str):
#     """
#     Best-practice for GitHub CSV on Colab:
#     - pandas.read_csv over HTTPS
#     - convert pandas → Spark
#     """
#     pdf = pd.read_csv(url)
#     return spark.createDataFrame(pdf)

# def load_from_local_upload():
#     """
#     Upload from your laptop to Colab.
#     It will prompt a file chooser UI.
#     """
#     from google.colab import files  # works on Colab
#     uploaded = files.upload()
#     if not uploaded:
#         raise RuntimeError("No file uploaded.")
#     fname = list(uploaded.keys())[0]
#     print("Uploaded:", fname)
#     return (spark.read
#             .option("header", True)
#             .option("inferSchema", True)
#             .option("multiLine", True)
#             .option("escape", "\"")
#             .csv(fname))

# def load_from_drive(path: str):
#     """
#     Read a CSV already present on filesystem (e.g., Drive).
#     """
#     if not os.path.isdir("/content/drive"):
#         from google.colab import drive
#         drive.mount('/content/drive', force_remount=True)
#     return (spark.read
#             .option("header", True)
#             .option("inferSchema", True)
#             .option("multiLine", True)
#             .option("escape", "\"")
#             .csv(path))

# # ---- Load main DataFrame: df ----
# if SOURCE == "github":
#     df = load_from_github(GITHUB_RAW_CSV_URL)
# elif SOURCE == "local_upload":
#     df = load_from_local_upload()
# elif SOURCE == "drive":
#     df = load_from_drive(DRIVE_CSV_PATH)
# else:
#     raise ValueError("SOURCE must be 'github', 'local_upload', or 'drive'")

# print("\n=== DATA PREVIEW ===")
# print("Rows:", df.count())
# print("Columns:", df.columns)
# df.printSchema()
# df.show(5, truncate=False)

# # Cache for faster iterative analysis
# df.cache()


# # =========================================================
# # DAY 1 — BASICS: view, schema, select, filter, simple stats
# # =========================================================

# # UNCOMMENT this block to practice Day-1
# # """
# print("\n================= DAY 1 =================")
# # Show top rows
# df.show(10)

# # Schema (column names + data types)
# df.printSchema()

# # Column list, count
# print("Total Rows:", df.count())
# print("Total Cols :", len(df.columns))
# print("Columns    :", df.columns)

# # Select specific cols (change as per your CSV)
# some_cols = df.columns[:3]  # first 3 columns for demo
# df.select(*some_cols).show(5)

# # Rename columns safely (example)
# renamed_df = df
# # Example: if your dataset has "size (cm)" type names, you can rename like:
# # for c in df.columns:
# #     renamed_df = renamed_df.withColumnRenamed(c, c.replace(" ", "_").replace("(", "").replace(")", ""))

# # Basic describe (numeric cols only)
# print("\n--- Describe numeric cols ---")
# df.describe().show()

# # Simple filtering (example numeric col: 'total_bill' if using tips.csv)
# num_col = None
# for c, t in df.dtypes:
#     if t in ("int", "bigint", "double", "float", "long", "decimal"):
#         num_col = c
#         break

# if num_col:
#     print(f"\n--- Filter {num_col} > average ---")
#     avg_val = df.select(F.avg(F.col(num_col))).first()[0]
#     df.filter(F.col(num_col) > avg_val).show(5)

# # Distinct counts per column
# print("\n--- Distinct counts per column ---")
# distincts = df.select([F.countDistinct(F.col(c)).alias(c) for c in df.columns])
# distincts.show()

# # Sort (orderBy)
# if num_col:
#     print(f"\n--- Top 5 rows by {num_col} desc ---")
#     df.orderBy(F.col(num_col).desc()).show(5)
# # """


# # =========================================================
# # DAY 2 — SELECT/ALIAS, WHERE/FILTER, BOOLEAN LOGIC, LIKE/IN/IS NULL
# # =========================================================

# # UNCOMMENT to practice Day-2
# # """
# print("\n================= DAY 2 =================")
# # Aliasing and expressions
# demo = df.select(
#     *[F.col(c) for c in df.columns],
# )

# # Example: Add a constant column (lit), and create boolean conditions
# demo = demo.withColumn("CONST_ONE", F.lit(1))

# # String search (if any string column exists)
# str_col = None
# for c, t in df.dtypes:
#     if t == "string":
#         str_col = c
#         break

# if str_col:
#     print(f"Using string column: {str_col}")
#     demo.filter(F.col(str_col).isNotNull()).select(str_col).show(5)
#     demo.filter(F.col(str_col).like("%a%")).select(str_col).show(5)  # contains letter 'a'
#     demo.filter(F.col(str_col).isin(demo.select(str_col).limit(3).rdd.flatMap(lambda x: x).collect())).select(str_col).show(5)

# # Null handling preview
# print("\n--- Null handling ---")
# print("Rows with ANY null in any column:", df.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df.columns]).agg(
#     sum([F.col(c) for c in df.columns])).collect()[0][0])

# null_counts = df.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df.columns])
# null_counts.show()

# # Replace nulls generically
# filled = df.fillna(0)  # numeric nulls → 0; strings stay unchanged
# filled.show(3)
# # """


# # =========================================================
# # DAY 3 — GROUPBY/AGG, MULTI-AGG, PIVOT, HAVING
# # =========================================================

# # UNCOMMENT to practice Day-3
# # """
# print("\n================= DAY 3 =================")
# # Pick a categorical col and numeric col for aggregations
# cat_col = None
# for c, t in df.dtypes:
#     if t not in ("int","bigint","double","float","long","decimal"):
#         cat_col = c
#         break

# num_cols = [c for c, t in df.dtypes if t in ("int","bigint","double","float","long","decimal")]

# if cat_col and num_cols:
#     agg_df = df.groupBy(cat_col).agg(
#         F.count("*").alias("rows"),
#         *[F.round(F.avg(F.col(n)), 3).alias(f"avg_{n}") for n in num_cols[:2]]
#     ).orderBy(F.col("rows").desc())
#     agg_df.show()

#     # Pivot (only if a second categorical col exists)
#     # Create a small demo: average of first numeric col by (cat_col, another_cat)
#     other_cat = None
#     for c, t in df.dtypes:
#         if t not in ("int","bigint","double","float","long","decimal") and c != cat_col:
#             other_cat = c
#             break

#     if other_cat and len(num_cols) > 0:
#         print(f"\n--- Pivot: avg({num_cols[0]}) by {cat_col} pivot {other_cat} ---")
#         pivot_df = (df.groupBy(cat_col)
#                       .pivot(other_cat)
#                       .agg(F.round(F.avg(F.col(num_cols[0])), 2)))
#         pivot_df.show()

# # Having-like filter: use filter after agg
# if cat_col:
#     print("\n--- Having-like: keep groups with rows >= 10 ---")
#     gb = df.groupBy(cat_col).agg(F.count("*").alias("rows"))
#     gb.filter(F.col("rows") >= 10).show()
# # """


# # =========================================================
# # DAY 4 — STRING/DATE/NUMERIC FUNCTIONS, UDFs, JSON, ARRAY/EXPLODE
# # =========================================================

# # UNCOMMENT to practice Day-4
# # """
# print("\n================= DAY 4 =================")

# # STRING FUNCS
# if str_col:
#     print("\n--- STRING FUNCS ---")
#     str_df = df.select(
#         F.col(str_col).alias("raw"),
#         F.upper(F.col(str_col)).alias("upper"),
#         F.lower(F.col(str_col)).alias("lower"),
#         F.length(F.col(str_col)).alias("len"),
#         F.substring(F.col(str_col), 1, 3).alias("first3")
#     )
#     str_df.show(5, truncate=False)

# # DATE/TIME FUNCS — make a tiny demo column
# print("\n--- DATE/TIME FUNCS ---")
# demo_dates = spark.createDataFrame([
#     ("A", "2025-08-31 10:15:00"),
#     ("B", "2025-09-01 22:45:33"),
#     ("C", "2025-09-03 05:00:00")
# ], schema="id string, ts string")

# dt = (demo_dates
#       .withColumn("ts_ts", F.to_timestamp("ts"))
#       .withColumn("date", F.to_date("ts_ts"))
#       .withColumn("year", F.year("ts_ts"))
#       .withColumn("month", F.month("ts_ts"))
#       .withColumn("day", F.dayofmonth("ts_ts"))
#       .withColumn("dow", F.date_format("ts_ts", "E"))
#       .withColumn("plus_7d", F.date_add(F.col("date"), 7))
#       .withColumn("diff_days", F.datediff(F.col("plus_7d"), F.col("date"))))
# dt.show(truncate=False)

# # NUMERIC FUNCS
# if num_col:
#     print("\n--- NUMERIC FUNCS ---")
#     num_demo = df.select(
#         F.col(num_col).alias("x"),
#         F.round(F.col(num_col), 2).alias("round2"),
#         F.sqrt(F.abs(F.col(num_col))).alias("sqrt_abs"),
#         F.when(F.col(num_col) > 10, "big").otherwise("small").alias("bucket")
#     )
#     num_demo.show(5)

# # UDF (User Defined Function) – only when truly required
# print("\n--- UDF demo (square numeric) ---")
# @F.udf("double")
# def square_udf(x):
#     return float(x)**2 if x is not None else None

# if num_col:
#     df.withColumn("squared", square_udf(F.col(num_col))).select(num_col, "squared").show(5)

# # ARRAY / EXPLODE / JSON
# print("\n--- ARRAY/EXPLODE/JSON ---")
# arr_df = spark.createDataFrame([
#     (1, ["red","blue","green"], '{"k":1,"msg":"hi"}'),
#     (2, ["x","y"], '{"k":2,"msg":"yo"}'),
# ], schema="id int, colors array<string>, meta string")

# arr_df.select("id", "colors", F.explode("colors").alias("color")).show()

# json_df = arr_df.select(
#     "id",
#     F.from_json("meta", T.StructType([
#         T.StructField("k", T.IntegerType(), True),
#         T.StructField("msg", T.StringType(), True)
#     ])).alias("meta_parsed")
# )
# json_df.select("id", "meta_parsed.*").show()
# # """


# # =========================================================
# # DAY 5 — JOINS (inner, left, right, full), SEMI/ANTI, UNION, DEDUP
# # =========================================================

# # UNCOMMENT to practice Day-5
# # """
# print("\n================= DAY 5 =================")

# # Synthetic small tables for joins
# customers = spark.createDataFrame([
#     (1, "Amit", "Delhi"),
#     (2, "Riya", "Noida"),
#     (3, "Karan", "Gurgaon"),
#     (4, "Neha", "Delhi"),
# ], schema="cust_id int, name string, city string")

# orders = spark.createDataFrame([
#     (101, 1, "2025-08-30", 2999.0),
#     (102, 1, "2025-09-01", 1599.0),
#     (103, 2, "2025-08-29",  899.0),
#     (104, 5, "2025-08-28", 1200.0),  # cust_id=5 not in customers to demo outer join
# ], schema="order_id int, cust_id int, order_date string, amount double")

# print("\nCustomers:"); customers.show()
# print("Orders:"); orders.show()

# inner_j = customers.join(orders, "cust_id", "inner")
# left_j  = customers.join(orders, "cust_id", "left")
# right_j = customers.join(orders, "cust_id", "right")
# full_j  = customers.join(orders, "cust_id", "outer")

# print("\n--- INNER JOIN ---"); inner_j.show()
# print("\n--- LEFT JOIN ---"); left_j.show()
# print("\n--- RIGHT JOIN ---"); right_j.show()
# print("\n--- FULL OUTER JOIN ---"); full_j.show()

# # SEMI (keep left rows that have a match) & ANTI (keep left rows that DON'T have a match)
# semi_j = customers.join(orders, "cust_id", "left_semi")
# anti_j = customers.join(orders, "cust_id", "left_anti")
# print("\n--- LEFT SEMI (customers with at least 1 order) ---"); semi_j.show()
# print("\n--- LEFT ANTI (customers with NO orders) ---"); anti_j.show()

# # UNION (stack vertically, same schema)
# customers2 = spark.createDataFrame([
#     (5, "Meera", "Ghaziabad"),
#     (6, "Vikram", "Faridabad"),
# ], schema=customers.schema)
# all_customers = customers.unionByName(customers2)
# print("\n--- UNION customers + customers2 ---"); all_customers.show()

# # DEDUP (dropDuplicates)
# dedup = all_customers.unionByName(customers).dropDuplicates(["cust_id"])
# print("\n--- DEDUP by cust_id ---"); dedup.orderBy("cust_id").show()
# # """


# # =========================================================
# # DAY 6 — WINDOW FUNCTIONS: rank, dense_rank, row_number, lag/lead, rolling
# # =========================================================

# # UNCOMMENT to practice Day-6
# # """
# print("\n================= DAY 6 =================")
# # Using 'orders' from Day-5 (re-create if you skipped Day-5)
# orders = spark.createDataFrame([
#     (101, 1, "2025-08-30", 2999.0),
#     (102, 1, "2025-09-01", 1599.0),
#     (103, 2, "2025-08-29",  899.0),
#     (104, 5, "2025-08-28", 1200.0),
#     (105, 2, "2025-09-02", 1799.0),
#     (106, 1, "2025-09-03",  499.0),
# ], schema="order_id int, cust_id int, order_date string, amount double")

# w_cust_date = Window.partitionBy("cust_id").orderBy(F.to_date("order_date"))

# win_df = (orders
#           .withColumn("rn", F.row_number().over(w_cust_date))
#           .withColumn("rnk", F.rank().over(w_cust_date))
#           .withColumn("dense_rnk", F.dense_rank().over(w_cust_date))
#           .withColumn("prev_amount", F.lag("amount", 1).over(w_cust_date))
#           .withColumn("next_amount", F.lead("amount", 1).over(w_cust_date))
#           .withColumn("diff_from_prev", F.col("amount") - F.col("prev_amount"))
#           )

# print("\n--- Window basics ---"); win_df.orderBy("cust_id", "order_date").show()

# # Rolling sum (requires a range/between rows frame)
# w_rows = Window.partitionBy("cust_id").orderBy(F.to_date("order_date")).rowsBetween(Window.unboundedPreceding, 0)
# rolling = orders.withColumn("running_total", F.sum("amount").over(w_rows))
# print("\n--- Running total per customer ---"); rolling.orderBy("cust_id", "order_date").show()
# # """


# # =========================================================
# # DAY 7 — PERFORMANCE, WRITE, FILE FORMATS, REPARTITION, EXPLAIN
# # =========================================================

# # UNCOMMENT to practice Day-7
# # """
# print("\n================= DAY 7 =================")

# # Repartition / coalesce (control parallelism & output files)
# print("\n--- Repartition/Coalesce demo ---")
# r_df = df.repartition(4)      # increase partitions
# print("Partitions after repartition(4):", r_df.rdd.getNumPartitions())
# c_df = r_df.coalesce(1)       # reduce partitions (try to write single output file)
# print("Partitions after coalesce(1):", c_df.rdd.getNumPartitions())

# # Write outputs (small sample to avoid huge storage)
# out_base = "/content/pyspark_outputs"
# os.makedirs(out_base, exist_ok=True)

# (df.limit(500)
#    .write.mode("overwrite").option("header", True).csv(f"{out_base}/csv_output"))
# print("✅ Wrote CSV to:", f"{out_base}/csv_output")

# (df.limit(500)
#    .write.mode("overwrite").parquet(f"{out_base}/parquet_output"))
# print("✅ Wrote Parquet to:", f"{out_base}/parquet_output")

# # Explain plans (how Spark will execute)
# print("\n--- EXPLAIN plan (physical) ---")
# df.explain(mode="formatted")

# # Broadcast join hint (small table broadcast to speed up)
# small_dim = spark.createDataFrame([(1,"VIP"), (2,"REG"), (3,"NEW")], "cust_id int, segment string")
# orders = spark.createDataFrame([
#     (101, 1, 2999.0),
#     (102, 1, 1599.0),
#     (103, 2,  899.0),
#     (106, 1,  499.0),
# ], "order_id int, cust_id int, amount double")

# joined = orders.join(F.broadcast(small_dim), "cust_id", "left")
# print("\n--- Broadcast Join ---"); joined.show()

# # Basic SQL usage (optional)
# print("\n--- Spark SQL demo ---")
# df.createOrReplaceTempView("t")
# spark.sql("SELECT * FROM t LIMIT 5").show()
# # """


# # =========================================================
# # MINI PROJECT — END-TO-END (Load → Clean → Transform → Analyze → Export)
# # =========================================================

# # UNCOMMENT to practice Mini Project
# # """
# print("\n================= MINI PROJECT =================")

# # 1) Load: already done into df

# # 2) Clean: standardize column names (snake_case), trim strings
# clean = df
# for c in df.columns:
#     new_c = (c.strip()
#                .lower()
#                .replace(" ", "_")
#                .replace("(", "")
#                .replace(")", "")
#                .replace("-", "_"))
#     clean = clean.withColumnRenamed(c, new_c)

# # Trim string columns & handle nulls
# for c, t in clean.dtypes:
#     if t == "string":
#         clean = clean.withColumn(c, F.trim(F.col(c)))

# # 3) Transform: demo features (example assumes there is at least one numeric col)
# # Add: zscore of first numeric column + bucketize
# num_cols = [c for c, t in clean.dtypes if t in ("int","bigint","double","float","long","decimal")]
# if num_cols:
#     x = num_cols[0]
#     stats = clean.select(F.avg(x).alias("mu"), F.stddev(x).alias("sd")).first()
#     mu, sd = stats["mu"], stats["sd"] or 1.0
#     clean = (clean
#              .withColumn(f"{x}_z", (F.col(x) - F.lit(mu)) / F.lit(sd))
#              .withColumn(f"{x}_bucket", F.when(F.col(x) <= mu, "LOW").otherwise("HIGH"))
#             )

# # 4) Analyze: group/pivot/correlations
# # 4a) Null report
# null_report = clean.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in clean.columns])
# print("\n--- Null report ---"); null_report.show()

# # 4b) If at least two numeric columns exist → corr matrix (pairwise)
# num_cols = [c for c, t in clean.dtypes if t in ("int","bigint","double","float","long","decimal")]
# if len(num_cols) >= 2:
#     pairs = []
#     for i in range(len(num_cols)):
#         for j in range(i+1, len(num_cols)):
#             c1, c2 = num_cols[i], num_cols[j]
#             corr_val = clean.select(F.corr(c1, c2).alias("corr")).first()["corr"]
#             pairs.append((c1, c2, corr_val))
#     corr_df = spark.createDataFrame(pairs, schema="col1 string, col2 string, corr double")
#     print("\n--- Pairwise correlations ---"); corr_df.show(truncate=False)

# # 4c) Quick frequency table (first string column vs first numeric column avg)
# str_cols = [c for c, t in clean.dtypes if t == "string"]
# if str_cols and num_cols:
#     ccat, cnum = str_cols[0], num_cols[0]
#     freq = (clean.groupBy(ccat).agg(
#                 F.count("*").alias("rows"),
#                 F.round(F.avg(cnum),2).alias(f"avg_{cnum}")
#            ).orderBy(F.col("rows").desc()))
#     print(f"\n--- Frequency of {ccat} with avg({cnum}) ---"); freq.show()

# # 5) Export results
# out_project = "/content/project_output"
# (clean.limit(1000).write.mode("overwrite").parquet(f"{out_project}/clean_parquet"))
# freq.limit(1000).write.mode("overwrite").option("header", True).csv(f"{out_project}/freq_csv")
# print("✅ Project outputs saved under:", out_project)
# # """


# print("\n🎯 All sections ready. Comment/Uncomment blocks for your daily practice. Happy Learning!")
