# Part 1: Advanced Spark

A data lake is a centralized storage system that lets you:

* Ingest and store raw data from any source (structured, semi-structured, unstructured)

* Transform it using big data engines like Apache Spark

* Make it queryable via tools like Hive, Presto, Spark SQL, or Delta Lake

Hive: It lets you query large datasets using a language similar to SQL, called HiveQL.

In [126]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Example") \
    .getOrCreate()

Step 1: Load CSV with Schema

In [127]:
df = spark.read.csv("sales.txt", header=True, inferSchema=True)
df.printSchema()
df.show()

root
 |-- date: date (nullable = true)
 |-- region: string (nullable = true)
 |-- sales: integer (nullable = true)
 |-- product_id: integer (nullable = true)

+----------+------+-----+----------+
|      date|region|sales|product_id|
+----------+------+-----+----------+
|2024-01-01| North|  100|       101|
|2024-01-01| South|  200|       102|
|2024-01-02| North|  150|       101|
|2024-01-02| South|  300|       103|
|2024-01-03|  East|  250|       104|
|2024-01-03|  West|  180|       105|
|2024-01-04| North|   90|       106|
|2024-01-04|  East|  200|       104|
|2024-01-05| South|  210|       102|
|2024-01-05|  West|  300|       107|
+----------+------+-----+----------+



Or use manual schema (better performance and safety)

In [117]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

schema = StructType([
    StructField("date", DateType(), True),
    StructField("region", StringType(), True),
    StructField("sales", IntegerType(), True),
    StructField("product_id", IntegerType(), True)
])

df = spark.read.csv("sales.txt", header=True, schema=schema)
df.printSchema()
df.show()

root
 |-- date: date (nullable = true)
 |-- region: string (nullable = true)
 |-- sales: integer (nullable = true)
 |-- product_id: integer (nullable = true)

+----------+------+-----+----------+
|      date|region|sales|product_id|
+----------+------+-----+----------+
|2024-06-08| North| NULL|       345|
|2024-06-26|  West| NULL|       372|
|2024-06-20|  East| NULL|       304|
|2024-06-12| North| NULL|       324|
|2024-06-24| North| NULL|       312|
|2024-06-06|  East| NULL|       340|
|2024-06-02| South| NULL|       321|
|2024-06-22|  West| NULL|       363|
|2024-06-17| North| NULL|       345|
|2024-06-16|  East| NULL|       306|
|2024-06-07|  East| NULL|       328|
|2024-06-01|  West| NULL|       396|
|2024-06-08|  East| NULL|       310|
|2024-06-15| North| NULL|       341|
|2024-06-14|  East| NULL|       315|
|2024-06-20|  West| NULL|       389|
|2024-06-19|  West| NULL|       368|
|2024-06-08|  East| NULL|       400|
|2024-06-17| South| NULL|       400|
|2024-06-08| North| NULL|  

> `True` means it is allowed to contain null (missing) values.

Step 2: Basic Aggregation: Sum of sales per region.

In [128]:
df.groupBy("region").sum("sales").show()

+------+----------+
|region|sum(sales)|
+------+----------+
| South|       710|
|  East|       450|
|  West|       480|
| North|       340|
+------+----------+



In [129]:
from pyspark.sql.functions import round, sum

df.groupBy("region") \
  .agg(round(sum("sales"), 2).alias("total_sales")) \
  .show()

+------+-----------+
|region|total_sales|
+------+-----------+
| South|        710|
|  East|        450|
|  West|        480|
| North|        340|
+------+-----------+



Let's generate the SQL script.

In [130]:
df.createOrReplaceTempView("sales_view")

spark.sql("""
    SELECT region, ROUND(SUM(sales), 2) AS sum_sales
    FROM sales_view
    GROUP BY region
""").show()

+------+---------+
|region|sum_sales|
+------+---------+
| South|      710|
|  East|      450|
|  West|      480|
| North|      340|
+------+---------+



> Note:
`df.groupBy().sum()` automatically names the column sum(sales)
> In SQL, you can alias it using AS (e.g., AS sum_sales)

Step 3: Window Function — Rolling Total

Window: Defines a window frame over which aggregations are calculated.



In [131]:
from pyspark.sql.window import Window
from pyspark.sql.functions import sum

windowSpec = Window.partitionBy("region").orderBy("date") \
                   .rowsBetween(Window.unboundedPreceding, 0)

df.withColumn("cumulative_sales", sum("sales").over(windowSpec)).show()

+----------+------+-----+----------+----------------+
|      date|region|sales|product_id|cumulative_sales|
+----------+------+-----+----------+----------------+
|2024-01-03|  East|  250|       104|             250|
|2024-01-04|  East|  200|       104|             450|
|2024-01-01| North|  100|       101|             100|
|2024-01-02| North|  150|       101|             250|
|2024-01-04| North|   90|       106|             340|
|2024-01-01| South|  200|       102|             200|
|2024-01-02| South|  300|       103|             500|
|2024-01-05| South|  210|       102|             710|
|2024-01-03|  West|  180|       105|             180|
|2024-01-05|  West|  300|       107|             480|
+----------+------+-----+----------+----------------+



Example: 3-row Moving Average of sales (centered)

In [132]:
from pyspark.sql.window import Window
from pyspark.sql.functions import avg

windowSpec = Window.partitionBy("region").orderBy("date") \
                   .rowsBetween(-1, 1)  # 1 row before, current row, 1 row after

df.withColumn("moving_avg_sales", avg("sales").over(windowSpec)).show()


+----------+------+-----+----------+------------------+
|      date|region|sales|product_id|  moving_avg_sales|
+----------+------+-----+----------+------------------+
|2024-01-03|  East|  250|       104|             225.0|
|2024-01-04|  East|  200|       104|             225.0|
|2024-01-01| North|  100|       101|             125.0|
|2024-01-02| North|  150|       101|113.33333333333333|
|2024-01-04| North|   90|       106|             120.0|
|2024-01-01| South|  200|       102|             250.0|
|2024-01-02| South|  300|       103|236.66666666666666|
|2024-01-05| South|  210|       102|             255.0|
|2024-01-03|  West|  180|       105|             240.0|
|2024-01-05|  West|  300|       107|             240.0|
+----------+------+-----+----------+------------------+



> For each row, include all rows from the beginning of the partition up to the current row → this is what makes it cumulative.

> Alternative: 3-row trailing moving average
> `Window.rowsBetween(-2, 0)`

> Joins sales with product descriptions.
>
> * how="left" keeps all sales rows even if product name is missing.



In [135]:
products_df = spark.read.csv("products.txt", header=True, inferSchema=True)
joined_df = df.join(products_df, on="product_id", how="left")
joined_df.select("date", "region", "sales", "product_name", "price").show()

+----------+------+-----+------------+------+
|      date|region|sales|product_name| price|
+----------+------+-----+------------+------+
|2024-01-01| North|  100|       Phone|599.99|
|2024-01-01| South|  200|      Tablet|399.99|
|2024-01-02| North|  150|       Phone|599.99|
|2024-01-02| South|  300|      Laptop|999.99|
|2024-01-03|  East|  250|     Monitor|199.99|
|2024-01-03|  West|  180|    Keyboard| 49.99|
|2024-01-04| North|   90|       Mouse| 29.99|
|2024-01-04|  East|  200|     Monitor|199.99|
|2024-01-05| South|  210|      Tablet|399.99|
|2024-01-05|  West|  300|     Printer|149.99|
+----------+------+-----+------------+------+



Step 5: SQL Query on DataFrame

In [136]:
joined_df.createOrReplaceTempView("sales_view")

spark.sql("""
    SELECT region, SUM(sales) as total_sales
    FROM sales_view
    WHERE date >= '2024-01-02'
    GROUP BY region
""").show()


+------+-----------+
|region|total_sales|
+------+-----------+
| South|        510|
|  East|        450|
|  West|        480|
| North|        240|
+------+-----------+



Step 6: Let's join the data, and create a join query.

In [137]:
df.createOrReplaceTempView("sales_view")
products_df.createOrReplaceTempView("products_view")
spark.sql("""
    SELECT
        s.date,
        s.region,
        s.sales,
        s.product_id,
        p.product_name
    FROM sales_view s
    JOIN products_view p
      ON s.product_id = p.product_id
    WHERE s.date >= '2024-01-02'
""").show()


+----------+------+-----+----------+------------+
|      date|region|sales|product_id|product_name|
+----------+------+-----+----------+------------+
|2024-01-02| North|  150|       101|       Phone|
|2024-01-02| South|  300|       103|      Laptop|
|2024-01-03|  East|  250|       104|     Monitor|
|2024-01-03|  West|  180|       105|    Keyboard|
|2024-01-04| North|   90|       106|       Mouse|
|2024-01-04|  East|  200|       104|     Monitor|
|2024-01-05| South|  210|       102|      Tablet|
|2024-01-05|  West|  300|       107|     Printer|
+----------+------+-----+----------+------------+



Step 7: Cache and Repartition

In [138]:
df.cache()
df.count()

df = df.repartition(4, "region")

> Redistributes the DataFrame into 4 partitions.
>
> Uses region as the partitioning key, so all rows with the same region ideally end up in the same partition.
>
> Causes a shuffle (data movement across the cluster) — can be expensive.



In [139]:
print("Partitions:", df.rdd.getNumPartitions())
df.rdd.glom().map(lambda x: [row.asDict() for row in x]).collect()

Partitions: 4


[[{'date': datetime.date(2024, 1, 1),
   'region': 'North',
   'sales': 100,
   'product_id': 101},
  {'date': datetime.date(2024, 1, 2),
   'region': 'North',
   'sales': 150,
   'product_id': 101},
  {'date': datetime.date(2024, 1, 3),
   'region': 'West',
   'sales': 180,
   'product_id': 105},
  {'date': datetime.date(2024, 1, 4),
   'region': 'North',
   'sales': 90,
   'product_id': 106},
  {'date': datetime.date(2024, 1, 5),
   'region': 'West',
   'sales': 300,
   'product_id': 107}],
 [{'date': datetime.date(2024, 1, 3),
   'region': 'East',
   'sales': 250,
   'product_id': 104},
  {'date': datetime.date(2024, 1, 4),
   'region': 'East',
   'sales': 200,
   'product_id': 104}],
 [{'date': datetime.date(2024, 1, 1),
   'region': 'South',
   'sales': 200,
   'product_id': 102},
  {'date': datetime.date(2024, 1, 2),
   'region': 'South',
   'sales': 300,
   'product_id': 103},
  {'date': datetime.date(2024, 1, 5),
   'region': 'South',
   'sales': 210,
   'product_id': 102}],
 [

> This shows a list of lists: one inner list per partition, each containing its rows as dictionaries.

Save to Disk (Parquet)

In [140]:
joined_df.show()
joined_df.write.mode("overwrite").parquet("output/joined_sales_products.parquet")

+----------+----------+------+-----+------------+-----------+------+
|product_id|      date|region|sales|product_name|   category| price|
+----------+----------+------+-----+------------+-----------+------+
|       101|2024-01-01| North|  100|       Phone|Electronics|599.99|
|       102|2024-01-01| South|  200|      Tablet|Electronics|399.99|
|       101|2024-01-02| North|  150|       Phone|Electronics|599.99|
|       103|2024-01-02| South|  300|      Laptop|Electronics|999.99|
|       104|2024-01-03|  East|  250|     Monitor|Accessories|199.99|
|       105|2024-01-03|  West|  180|    Keyboard|Accessories| 49.99|
|       106|2024-01-04| North|   90|       Mouse|Accessories| 29.99|
|       104|2024-01-04|  East|  200|     Monitor|Accessories|199.99|
|       102|2024-01-05| South|  210|      Tablet|Electronics|399.99|
|       107|2024-01-05|  West|  300|     Printer|     Office|149.99|
+----------+----------+------+-----+------------+-----------+------+



Register in Spark catalog

In [145]:
# Write the Parquet file
joined_df.write.mode("overwrite").parquet("output/sales_products/")

# Register the location as a SQL view
spark.sql("""
    CREATE OR REPLACE TEMPORARY VIEW sales_products_view
    USING PARQUET
    OPTIONS (
        path "output/sales_products/"
    )
""")

# Query it
spark.sql("SELECT * FROM sales_products_view LIMIT 10").show()


+----------+----------+------+-----+------------+-----------+------+
|product_id|      date|region|sales|product_name|   category| price|
+----------+----------+------+-----+------------+-----------+------+
|       101|2024-01-01| North|  100|       Phone|Electronics|599.99|
|       102|2024-01-01| South|  200|      Tablet|Electronics|399.99|
|       101|2024-01-02| North|  150|       Phone|Electronics|599.99|
|       103|2024-01-02| South|  300|      Laptop|Electronics|999.99|
|       104|2024-01-03|  East|  250|     Monitor|Accessories|199.99|
|       105|2024-01-03|  West|  180|    Keyboard|Accessories| 49.99|
|       106|2024-01-04| North|   90|       Mouse|Accessories| 29.99|
|       104|2024-01-04|  East|  200|     Monitor|Accessories|199.99|
|       102|2024-01-05| South|  210|      Tablet|Electronics|399.99|
|       107|2024-01-05|  West|  300|     Printer|     Office|149.99|
+----------+----------+------+-----+------------+-----------+------+



Option 2: Using Hive Metastore (if enabled)

In [146]:
spark = SparkSession.builder \
    .appName("DataLakeWithCatalog") \
    .enableHiveSupport() \
    .getOrCreate()

joined_df.write.mode("overwrite").saveAsTable("sales_products")

Query the Hive Table in Spark

In [147]:
# List available Hive tables
spark.sql("SHOW TABLES").show()

# See the schema of the table
spark.sql("DESCRIBE sales_products").show()

# Run a simple query
spark.sql("SELECT region, SUM(sales) as total_sales FROM sales_products GROUP BY region").show()

# Query with filters
spark.sql("SELECT * FROM sales_products WHERE category = 'Electronics' AND sales > 500").show()


+---------+-------------------+-----------+
|namespace|          tableName|isTemporary|
+---------+-------------------+-----------+
|  default|     sales_products|      false|
|         |      products_view|       true|
|         |sales_products_view|       true|
|         |         sales_view|       true|
+---------+-------------------+-----------+

+------------+---------+-------+
|    col_name|data_type|comment|
+------------+---------+-------+
|  product_id|      int|   NULL|
|        date|     date|   NULL|
|      region|   string|   NULL|
|       sales|      int|   NULL|
|product_name|   string|   NULL|
|    category|   string|   NULL|
|       price|   double|   NULL|
+------------+---------+-------+

+------+-----------+
|region|total_sales|
+------+-----------+
| South|        710|
|  East|        450|
|  West|        480|
| North|        340|
+------+-----------+

+----------+----+------+-----+------------+--------+-----+
|product_id|date|region|sales|product_name|category|pri

Save to Disk (CSV)

In [141]:
joined_df.write \
    .option("header", True) \
    .mode("overwrite") \
    .csv("output/joined_sales_products.csv")

# Part 2 ML

Start Spark Session

In [142]:
joined_df = spark.read.parquet("output/joined_sales_products.parquet")
joined_df.show()

+----------+----------+------+-----+------------+-----------+------+
|product_id|      date|region|sales|product_name|   category| price|
+----------+----------+------+-----+------------+-----------+------+
|       101|2024-01-01| North|  100|       Phone|Electronics|599.99|
|       102|2024-01-01| South|  200|      Tablet|Electronics|399.99|
|       101|2024-01-02| North|  150|       Phone|Electronics|599.99|
|       103|2024-01-02| South|  300|      Laptop|Electronics|999.99|
|       104|2024-01-03|  East|  250|     Monitor|Accessories|199.99|
|       105|2024-01-03|  West|  180|    Keyboard|Accessories| 49.99|
|       106|2024-01-04| North|   90|       Mouse|Accessories| 29.99|
|       104|2024-01-04|  East|  200|     Monitor|Accessories|199.99|
|       102|2024-01-05| South|  210|      Tablet|Electronics|399.99|
|       107|2024-01-05|  West|  300|     Printer|     Office|149.99|
+----------+----------+------+-----+------------+-----------+------+



Inspect Schema

In [82]:
joined_df.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- region: string (nullable = true)
 |-- sales: double (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- discount: double (nullable = true)
 |-- channel: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- base_price: double (nullable = true)



Choose numeric features

In [83]:
feature_cols = ["base_price", "quantity", "discount"]

Then use VectorAssembler

In [84]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
assembled_df = assembler.transform(joined_df).select("features", "sales")

Train Linear Regression Model

In [85]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol="features", labelCol="sales")
lr_model = lr.fit(assembled_df)

Make Predictions

In [86]:
predictions = lr_model.transform(assembled_df)
predictions.select("features", "sales", "prediction").show(10)

+------------------+-------+------------------+
|          features|  sales|        prediction|
+------------------+-------+------------------+
|  [98.96,9.0,0.11]| 793.41|1264.6579182142525|
| [35.11,10.0,0.18]| 288.47|1122.8630183946618|
| [294.36,3.0,0.08]| 816.65| 873.9604500072271|
| [444.19,3.0,0.17]|1101.73|1479.7513424578617|
| [458.05,9.0,0.06]|3874.64|2971.5559766738897|
| [224.47,6.0,0.23]|1038.01|1068.2052035399354|
|[155.66,10.0,0.01]|1538.76| 1842.947607524383|
| [91.25,10.0,0.13]| 791.03|1430.2080404930382|
|  [98.96,7.0,0.03]| 673.72|  900.714979370545|
| [470.44,4.0,0.12]| 1651.5|1869.3773679931694|
+------------------+-------+------------------+
only showing top 10 rows



Evaluate Model

In [87]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="sales", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse:.2f}")

Root Mean Squared Error (RMSE): 369.67


1. Create a DataFrame with the input
2. Assemble Features
3. Predict with the model

In [88]:
from pyspark.sql import Row

# Create a single-row DataFrame with your input
input_data = spark.createDataFrame([
    Row(base_price=94.0, quantity=9.2, discount=0.11)
])
input_features = assembler.transform(input_data)  # re-use your VectorAssembler
prediction_result = lr_model.transform(input_features)
prediction_result.select("features", "prediction").show()

+---------------+-----------------+
|       features|       prediction|
+---------------+-----------------+
|[94.0,9.2,0.11]|1285.817023641747|
+---------------+-----------------+



Step-by-step ML in Spark on sales_products

In [148]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression

# Step 1: Start Spark session with Hive support
spark = SparkSession.builder \
    .appName("SalesPrediction") \
    .enableHiveSupport() \
    .getOrCreate()

# Step 2: Load the Hive table
df = spark.sql("SELECT * FROM sales_products")

# Step 3: Handle categorical features
indexers = [
    StringIndexer(inputCol="region", outputCol="region_index"),
    StringIndexer(inputCol="product_name", outputCol="product_index"),
    StringIndexer(inputCol="category", outputCol="category_index")
]

for indexer in indexers:
    df = indexer.fit(df).transform(df)

# Step 4: Assemble features
feature_cols = ["region_index", "product_index", "category_index", "price"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
assembled_df = assembler.transform(df).select("features", "sales")

# Step 5: Train a Linear Regression model
lr = LinearRegression(featuresCol="features", labelCol="sales")
lr_model = lr.fit(assembled_df)

# Step 6: Make predictions
predictions = lr_model.transform(assembled_df)
predictions.select("features", "sales", "prediction").show(10)

# Optional: Evaluate model
print("Coefficients:", lr_model.coefficients)
print("Intercept:", lr_model.intercept)
print("RMSE:", lr_model.summary.rootMeanSquaredError)
print("R2:", lr_model.summary.r2)


+--------------------+-----+------------------+
|            features|sales|        prediction|
+--------------------+-----+------------------+
|[0.0,1.0,0.0,599.99]|  100|157.62096040728568|
|[1.0,2.0,0.0,399.99]|  200| 171.6961154173007|
|[0.0,1.0,0.0,599.99]|  150|157.62096040728568|
|[1.0,4.0,0.0,999.99]|  300| 293.0639982675108|
|[2.0,0.0,1.0,199.99]|  250| 207.5259402438897|
| [3.0,3.0,1.0,49.99]|  180|236.11518672409272|
| [0.0,5.0,1.0,29.99]|   90| 85.43663295476075|
|[2.0,0.0,1.0,199.99]|  200| 207.5259402438897|
|[1.0,2.0,0.0,399.99]|  210| 171.6961154173007|
|[3.0,6.0,2.0,149.99]|  300|291.69814991668346|
+--------------------+-----+------------------+

Coefficients: [50.53102393038454,2.4000552178203036,28.95483547003486,0.19427962069094912]
Intercept: 38.65507557110281
RMSE: 32.85423022353661
R2: 0.7730444820056462


# Part 3: Spark and NLP

Load File into DataFrame

In [90]:
df = spark.read.text("feedback.txt").withColumnRenamed("value", "text")
df.show(truncate=False)

+--------------------------------------------------------+
|text                                                    |
+--------------------------------------------------------+
|Great experience with fast shipping and quality product.|
|Unhelpful and rude customer service representative.     |
|Helpful agent resolved my issue quickly.                |
|The service was excellent and very helpful.             |
|Slow response from the support team.                    |
|Battery life is poor and needs improvement.             |
|Delivery was late and the phone was damaged.            |
|I am disappointed with the damaged packaging.           |
|Customer support was not helpful at all.                |
|Monitor quality is excellent and arrived on time.       |
|I am disappointed with the damaged packaging.           |
|Battery life is poor and needs improvement.             |
|Repeated delivery issues and poor communication.        |
|The service was excellent and very helpful.            

Tokenize Text into Words

In [91]:
from pyspark.ml.feature import Tokenizer

tokenizer = Tokenizer(inputCol="text", outputCol="words")
tokenized_df = tokenizer.transform(df)
tokenized_df.select("words").show(truncate=False)

+-----------------------------------------------------------------+
|words                                                            |
+-----------------------------------------------------------------+
|[great, experience, with, fast, shipping, and, quality, product.]|
|[unhelpful, and, rude, customer, service, representative.]       |
|[helpful, agent, resolved, my, issue, quickly.]                  |
|[the, service, was, excellent, and, very, helpful.]              |
|[slow, response, from, the, support, team.]                      |
|[battery, life, is, poor, and, needs, improvement.]              |
|[delivery, was, late, and, the, phone, was, damaged.]            |
|[i, am, disappointed, with, the, damaged, packaging.]            |
|[customer, support, was, not, helpful, at, all.]                 |
|[monitor, quality, is, excellent, and, arrived, on, time.]       |
|[i, am, disappointed, with, the, damaged, packaging.]            |
|[battery, life, is, poor, and, needs, improveme

Remove Stop Words

In [92]:
from pyspark.ml.feature import StopWordsRemover

remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
filtered_df = remover.transform(tokenized_df)
filtered_df.select("filtered_words").show(truncate=False)

+--------------------------------------------------------------+
|filtered_words                                                |
+--------------------------------------------------------------+
|[great, experience, fast, shipping, quality, product.]        |
|[unhelpful, rude, customer, service, representative.]         |
|[helpful, agent, resolved, issue, quickly.]                   |
|[service, excellent, helpful.]                                |
|[slow, response, support, team.]                              |
|[battery, life, poor, needs, improvement.]                    |
|[delivery, late, phone, damaged.]                             |
|[disappointed, damaged, packaging.]                           |
|[customer, support, helpful, all.]                            |
|[monitor, quality, excellent, arrived, time.]                 |
|[disappointed, damaged, packaging.]                           |
|[battery, life, poor, needs, improvement.]                    |
|[repeated, delivery, iss

Explode into One Word per Row (Optional)

In [93]:
from pyspark.sql.functions import explode, col

words_df = filtered_df.select(explode(col("filtered_words")).alias("word"))
words_df.show()

+---------------+
|           word|
+---------------+
|          great|
|     experience|
|           fast|
|       shipping|
|        quality|
|       product.|
|      unhelpful|
|           rude|
|       customer|
|        service|
|representative.|
|        helpful|
|          agent|
|       resolved|
|          issue|
|       quickly.|
|        service|
|      excellent|
|       helpful.|
|           slow|
+---------------+
only showing top 20 rows



Word Frequency Count

In [94]:
word_counts = words_df.groupBy("word").count().orderBy("count", ascending=False)
word_counts.show()

+-----------+-----+
|       word|count|
+-----------+-----+
|   delivery|   85|
|    service|   64|
|       late|   51|
|    battery|   45|
|   customer|   40|
|      phone|   40|
|  excellent|   39|
|       slow|   34|
|    support|   34|
|    quality|   34|
|    helpful|   29|
|      great|   27|
|       fast|   27|
|       poor|   27|
|    arrived|   26|
|       life|   23|
|       long|   22|
|    smooth.|   22|
|      lasts|   22|
|performance|   22|
+-----------+-----+
only showing top 20 rows



Remove Punctuation and Normalize Text

In [95]:
from pyspark.sql.functions import regexp_replace, lower

cleaned_df = df.withColumn("text", lower(regexp_replace("text", "[^a-zA-Z\\s]", "")))
cleaned_df.show()

+--------------------+
|                text|
+--------------------+
|great experience ...|
|unhelpful and rud...|
|helpful agent res...|
|the service was e...|
|slow response fro...|
|battery life is p...|
|delivery was late...|
|i am disappointed...|
|customer support ...|
|monitor quality i...|
|i am disappointed...|
|battery life is p...|
|repeated delivery...|
|the service was e...|
|excellent custome...|
|received the wron...|
|customer support ...|
|service was very ...|
|battery life is p...|
|unhelpful and rud...|
+--------------------+
only showing top 20 rows



In [96]:
from pyspark.ml.feature import Tokenizer, NGram

tokenizer = Tokenizer(inputCol="text", outputCol="words")
tokenized_df = tokenizer.transform(cleaned_df)

ngram = NGram(n=2, inputCol="words", outputCol="bigrams")
bigrams_df = ngram.transform(tokenized_df)
bigrams_df.select("bigrams").show(truncate=False)

+---------------------------------------------------------------------------------------------------------+
|bigrams                                                                                                  |
+---------------------------------------------------------------------------------------------------------+
|[great experience, experience with, with fast, fast shipping, shipping and, and quality, quality product]|
|[unhelpful and, and rude, rude customer, customer service, service representative]                       |
|[helpful agent, agent resolved, resolved my, my issue, issue quickly]                                    |
|[the service, service was, was excellent, excellent and, and very, very helpful]                         |
|[slow response, response from, from the, the support, support team]                                      |
|[battery life, life is, is poor, poor and, and needs, needs improvement]                                 |
|[delivery was, was late, la

Label the data (simple rule-based heuristic)

In [97]:
from pyspark.sql.functions import when, col

# Add binary sentiment label: 1 = Positive, 0 = Negative
labeled_df = df.withColumn("label", when(
    col("text").rlike("excellent|helpful|happy|great|fast|smooth|impressive|resolved"),
    1
).otherwise(0))

Tokenize + Remove Stop Words

In [98]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover

tokenizer = Tokenizer(inputCol="text", outputCol="words")
tokenized = tokenizer.transform(labeled_df)

remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
filtered = remover.transform(tokenized)


Convert to TF-IDF features

In [99]:
from pyspark.ml.feature import HashingTF, IDF

tf = HashingTF(inputCol="filtered_words", outputCol="rawFeatures", numFeatures=1000)
tf_df = tf.transform(filtered)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idf_model = idf.fit(tf_df)
tfidf_df = idf_model.transform(tf_df)


Train a Logistic Regression classifier

In [100]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol="features", labelCol="label")
model = lr.fit(tfidf_df)


Predict sentiment on the same dataset

In [101]:
predictions = model.transform(tfidf_df)
predictions.select("text", "label", "prediction", "probability").show(10, truncate=False)

+--------------------------------------------------------+-----+----------+------------------------------------------+
|text                                                    |label|prediction|probability                               |
+--------------------------------------------------------+-----+----------+------------------------------------------+
|Great experience with fast shipping and quality product.|1    |1.0       |[2.184005874752603E-9,0.9999999978159941] |
|Unhelpful and rude customer service representative.     |1    |1.0       |[3.2615955287345573E-9,0.9999999967384045]|
|Helpful agent resolved my issue quickly.                |1    |1.0       |[3.0249932894456743E-9,0.9999999969750067]|
|The service was excellent and very helpful.             |1    |1.0       |[7.139534972400031E-9,0.999999992860465]  |
|Slow response from the support team.                    |0    |0.0       |[0.9999999961748471,3.8251528611255026E-9]|
|Battery life is poor and needs improvement.    

Let's try a text. Create a DataFrame with the input.

In [104]:
from pyspark.sql import Row

# Your new sentence
input_text = "The delivery was delayed and took longer than expected."

# Create DataFrame
new_df = spark.createDataFrame([Row(text=input_text)])


Preprocess.

In [114]:
from pyspark.sql import Row
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression

# Load and label text
df = spark.read.text("feedback_300.txt").withColumnRenamed("value", "text")
df = df.withColumn("label", when(col("text").rlike("excellent|helpful|happy|great|fast|smooth|impressive|resolved"), 1).otherwise(0))

# NLP pipeline
tokenizer = Tokenizer(inputCol="text", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
hashingTF = HashingTF(inputCol="filtered_words", outputCol="rawFeatures", numFeatures=1000)
idf = IDF(inputCol="rawFeatures", outputCol="features")

# Transform training data
words_data = tokenizer.transform(df)
filtered_data = remover.transform(words_data)
tf_data = hashingTF.transform(filtered_data)
idf_model = idf.fit(tf_data)
train_data = idf_model.transform(tf_data).select("features", "label")

# Train model
lr = LogisticRegression(featuresCol="features", labelCol="label")
model = lr.fit(train_data)

# Predict single sentence
test_df = spark.createDataFrame([Row(text="The delivery was delayed and took longer than expected.")])
test_words = tokenizer.transform(test_df)
test_filtered = remover.transform(test_words)
test_tf = hashingTF.transform(test_filtered)
test_final = idf_model.transform(test_tf)
model.transform(test_final.select("features")).select("prediction", "probability").show(truncate=False)


+----------+----------------------------------------+
|prediction|probability                             |
+----------+----------------------------------------+
|0.0       |[0.9862279669633595,0.01377203303664054]|
+----------+----------------------------------------+



# Spark commands

Word Frequency Analysis with PySpark DataFrames

This task processes a plain text file (feedback.txt) using PySpark’s DataFrame API to perform a distributed word count. It reads each line of text, splits the content into lowercase words, filters out empty or non-word tokens, counts how often each word appears, and displays the results sorted by frequency.

The full pipeline includes:

1. Loading the text file as a DataFrame.

2. Tokenizing and normalizing each line into individual lowercase words.

3. Removing noise, such as empty tokens or punctuation-only entries.

4. Counting occurrences of each word using groupBy().count().

5. Displaying results, ordered by frequency in descending order.

This is a typical entry-level NLP/data-cleaning task for log analysis, sentiment preprocessing, or indexing pipelines — easily scalable with Spark.

In [None]:
# word_count_df.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, lower, col

# Create Spark session
spark = SparkSession.builder.appName("WordCountDF").getOrCreate()

# Read the file into a DataFrame (each line is one row)
df = spark.read.text("feedback.txt").withColumnRenamed("value", "line")

# Split into words, convert to lowercase, and explode into individual rows
words_df = df.select(explode(split(lower(col("line")), r"\W+")).alias("word"))

# Remove empty words (from punctuation, extra spaces, etc.)
words_df = words_df.filter(col("word") != "")

# Group and count word occurrences
word_counts_df = words_df.groupBy("word").count().orderBy("count", ascending=False)

# Show top results
word_counts_df.show()

spark.stop()

```spark-submit your_script.py
```

```
spark-submit --master local[4] \
             --executor-memory 2g \
             --conf spark.sql.shuffle.partitions=10 \
             word_count.py feedback_300.txt
            ```

`spark-submit --version`