In [1]:
# Install Java and Spark
!apt-get update -y
!apt-get install openjdk-11-jdk -y
!wget -q https://archive.apache.org/dist/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3.2.tgz
!tar xf spark-3.3.0-bin-hadoop3.2.tgz
!pip install findspark


Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Hit:2 http://archive.ubuntu.com/ubuntu jammy InRelease                                              
Get:3 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]                           
Get:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]                             
Get:6 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]                                
Get:7 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ Packages [62.9 kB]                 
Get:8 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]               
Get:9 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [1,306 kB]
Get:10 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease [18.1 kB]
Get:11 https://r2u.stat.illin

In [2]:
!pip install pyspark findspark pandas




In [3]:
# Step 1: Initialize PySpark
import findspark
findspark.init()

In [4]:
# Step 2: Import Necessary Libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, isnan, avg, sum, min, max, desc, row_number
from pyspark.sql.window import Window
import pandas as pd

In [5]:


# Create a Spark session
spark = SparkSession.builder.master("local[2]").appName("TitanicAnalysis").getOrCreate()


In [6]:
!wget https://raw.githubusercontent.com/datasciencedojo/datasets/master/titanic.csv


--2025-02-05 18:31:04--  https://raw.githubusercontent.com/datasciencedojo/datasets/master/titanic.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 60302 (59K) [text/plain]
Saving to: ‘titanic.csv’


2025-02-05 18:31:04 (7.46 MB/s) - ‘titanic.csv’ saved [60302/60302]



In [7]:
# Load Titanic dataset from local file
df = spark.read.csv('titanic.csv', header=True, inferSchema=True)
df.show()


+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| NULL|       S|
|          6|       0|     3|    Moran, Mr. James|  male|NULL|    0|    0|      

In [8]:
# Step 5: Display First 5 Rows
df.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| NULL|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

In [9]:
# Step 6: Print Schema
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [10]:
# Step 7: Count Total Rows and Columns
print(f"Total Rows: {df.count()}, Total Columns: {len(df.columns)}")

Total Rows: 891, Total Columns: 12


In [11]:
# Step 8: Display Summary Statistics
df.describe().show()

+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|summary|      PassengerId|           Survived|            Pclass|                Name|   Sex|               Age|             SibSp|              Parch|            Ticket|             Fare|Cabin|Embarked|
+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|  count|              891|                891|               891|                 891|   891|               714|               891|                891|               891|              891|  204|     889|
|   mean|            446.0| 0.3838383838383838| 2.308641975308642|                NULL|  NULL| 29.69911764705882|0.5230078563411896|0.38159371492704824|260318.54916792738| 32.20420

In [12]:
# Step 9: Check for Missing Values
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|177|    0|    0|     0|   0|  687|       2|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



In [13]:
# Step 10: Fill Missing Values
df = df.fillna({'Age': df.select(avg(col("Age"))).collect()[0][0], 'Embarked': 'Unknown'})

In [14]:
# Step 11: Select Specific Columns
df_selected = df.select("PassengerId", "Name", "Sex", "Age", "Fare", "Pclass", "Survived")
df_selected.show(5)

+-----------+--------------------+------+----+-------+------+--------+
|PassengerId|                Name|   Sex| Age|   Fare|Pclass|Survived|
+-----------+--------------------+------+----+-------+------+--------+
|          1|Braund, Mr. Owen ...|  male|22.0|   7.25|     3|       0|
|          2|Cumings, Mrs. Joh...|female|38.0|71.2833|     1|       1|
|          3|Heikkinen, Miss. ...|female|26.0|  7.925|     3|       1|
|          4|Futrelle, Mrs. Ja...|female|35.0|   53.1|     1|       1|
|          5|Allen, Mr. Willia...|  male|35.0|   8.05|     3|       0|
+-----------+--------------------+------+----+-------+------+--------+
only showing top 5 rows



In [15]:
# Step 12: Filter Data (Passengers with Age > 18 and Fare > 50)
df_filtered = df_selected.filter((col("Age") > 18) & (col("Fare") > 50))
df_filtered.show(5)

+-----------+--------------------+------+-----------------+--------+------+--------+
|PassengerId|                Name|   Sex|              Age|    Fare|Pclass|Survived|
+-----------+--------------------+------+-----------------+--------+------+--------+
|          2|Cumings, Mrs. Joh...|female|             38.0| 71.2833|     1|       1|
|          4|Futrelle, Mrs. Ja...|female|             35.0|    53.1|     1|       1|
|          7|McCarthy, Mr. Tim...|  male|             54.0| 51.8625|     1|       0|
|         28|Fortune, Mr. Char...|  male|             19.0|   263.0|     1|       0|
|         32|Spencer, Mrs. Wil...|female|29.69911764705882|146.5208|     1|       1|
+-----------+--------------------+------+-----------------+--------+------+--------+
only showing top 5 rows



In [16]:
# Step 13: Group By & Aggregation (Survival Rate by Gender)
df_grouped = df.groupBy("Sex").agg(avg("Survived").alias("Survival Rate")).orderBy(desc("Survival Rate"))
df_grouped.show()

+------+-------------------+
|   Sex|      Survival Rate|
+------+-------------------+
|female| 0.7420382165605095|
|  male|0.18890814558058924|
+------+-------------------+



In [17]:
# Step 14: Use SQL in PySpark (Convert DataFrame to SQL Table)
df.createOrReplaceTempView("titanic")

In [18]:
# Step 15: Run SQL Query (Top 5 Oldest Passengers)
spark.sql("SELECT Name, Age, Pclass FROM titanic ORDER BY Age DESC LIMIT 5").show()

+--------------------+----+------+
|                Name| Age|Pclass|
+--------------------+----+------+
|Barkworth, Mr. Al...|80.0|     1|
| Svensson, Mr. Johan|74.0|     3|
|Artagaveytia, Mr....|71.0|     1|
|Goldschmidt, Mr. ...|71.0|     1|
|Connors, Mr. Patrick|70.5|     3|
+--------------------+----+------+



In [19]:
# Step 16: Join Operation (Self Join Example on Same Dataset)
df_joined = df.alias("df1").join(df.alias("df2"), col("df1.Pclass") == col("df2.Pclass"), "inner")
df_joined.select("df1.Name", "df2.Name", "df1.Pclass").show(5)

+--------------------+--------------------+------+
|                Name|                Name|Pclass|
+--------------------+--------------------+------+
|Braund, Mr. Owen ...| Dooley, Mr. Patrick|     3|
|Braund, Mr. Owen ...|"Johnston, Miss. ...|     3|
|Braund, Mr. Owen ...|Rice, Mrs. Willia...|     3|
|Braund, Mr. Owen ...|Sutehall, Mr. Hen...|     3|
|Braund, Mr. Owen ...|Dahlberg, Miss. G...|     3|
+--------------------+--------------------+------+
only showing top 5 rows



In [20]:
# Step 17: Window Function (Ranking Passengers by Fare)
windowSpec = Window.orderBy(desc("Fare"))
df_with_rank = df.withColumn("Rank", row_number().over(windowSpec))
df_with_rank.select("Name", "Fare", "Rank").show(5)

+--------------------+--------+----+
|                Name|    Fare|Rank|
+--------------------+--------+----+
|    Ward, Miss. Anna|512.3292|   1|
|Cardeza, Mr. Thom...|512.3292|   2|
|Lesurer, Mr. Gust...|512.3292|   3|
|Fortune, Miss. Ma...|   263.0|   4|
|Fortune, Mr. Char...|   263.0|   5|
+--------------------+--------+----+
only showing top 5 rows



In [21]:
# Step 18: Convert PySpark DataFrame to Pandas
pandas_df = df.toPandas()
print(pandas_df.head())

   PassengerId  Survived  Pclass  \
0            1         0       3   
1            2         1       1   
2            3         1       3   
3            4         1       1   
4            5         0       3   

                                                Name     Sex   Age  SibSp  \
0                            Braund, Mr. Owen Harris    male  22.0      1   
1  Cumings, Mrs. John Bradley (Florence Briggs Th...  female  38.0      1   
2                             Heikkinen, Miss. Laina  female  26.0      0   
3       Futrelle, Mrs. Jacques Heath (Lily May Peel)  female  35.0      1   
4                           Allen, Mr. William Henry    male  35.0      0   

   Parch            Ticket     Fare Cabin Embarked  
0      0         A/5 21171   7.2500  None        S  
1      0          PC 17599  71.2833   C85        C  
2      0  STON/O2. 3101282   7.9250  None        S  
3      0            113803  53.1000  C123        S  
4      0            373450   8.0500  None        S  


In [22]:
# Step 19: Convert Pandas DataFrame to PySpark
spark_df_from_pandas = spark.createDataFrame(pandas_df)
spark_df_from_pandas.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| NULL|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

In [23]:
# Step 20: Save Processed Data to a CSV File
df.write.csv("processed_titanic.csv", header=True)

In [24]:
# Step 21: Stop Spark Session
spark.stop()

In [25]:
import pandas as pd
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("SpeedTest").getOrCreate()

# Creating a large dataset (1 million rows)
data = {"id": range(1, 1000001), "value": range(1000000, 2000000)}  # Adjusted the range for 'value'

# Pandas Test
start_time = time.time()
df_pandas = pd.DataFrame(data)
df_pandas["value_squared"] = df_pandas["value"] ** 2  # Squaring a column
print("Pandas Time:", time.time() - start_time, "seconds")

# PySpark Test
start_time = time.time()
df_spark = spark.createDataFrame(pd.DataFrame(data))
df_spark = df_spark.withColumn("value_squared", col("value") ** 2)  # Squaring a column
df_spark.show(5)
print("PySpark Time:", time.time() - start_time, "seconds")


Pandas Time: 0.04015064239501953 seconds
+---+-------+-----------------+
| id|  value|    value_squared|
+---+-------+-----------------+
|  1|1000000|           1.0E12|
|  2|1000001|1.000002000001E12|
|  3|1000002|1.000004000004E12|
|  4|1000003|1.000006000009E12|
|  5|1000004|1.000008000016E12|
+---+-------+-----------------+
only showing top 5 rows

PySpark Time: 21.550978660583496 seconds


In [26]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta

# Simulating data for 10 million transactions
n_rows = 10000000
data = {
    "customer_id": np.random.randint(1, 500000, size=n_rows),
    "product_id": np.random.randint(1, 10000, size=n_rows),
    "timestamp": [datetime.now() - timedelta(days=np.random.randint(0, 30)) for _ in range(n_rows)],
    "quantity": np.random.randint(1, 10, size=n_rows),
    "price": np.random.uniform(5, 500, size=n_rows),
    "category": np.random.choice(['Electronics', 'Clothing', 'Home', 'Books', 'Beauty'], size=n_rows),
    "store_location": np.random.choice(['NY', 'LA', 'SF', 'Chicago'], size=n_rows),
    "payment_method": np.random.choice(['Credit', 'Debit', 'PayPal', 'Cash'], size=n_rows)
}

df = pd.DataFrame(data)




In [27]:
# Save to CSV for later use in PySpark
df.to_csv('large_ecommerce_transactions.csv', index=False)

In [28]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("EcommerceAnalysis").getOrCreate()

# Load the data into PySpark DataFrame
df_spark = spark.read.csv("large_ecommerce_transactions.csv", header=True, inferSchema=True)

# Show the schema to verify
df_spark.printSchema()


root
 |-- customer_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)
 |-- category: string (nullable = true)
 |-- store_location: string (nullable = true)
 |-- payment_method: string (nullable = true)



In [29]:
from pyspark.sql.functions import sum, avg, count

# Calculate total spend per customer
customer_spend = df_spark.groupBy("customer_id").agg(
    sum("price").alias("total_spent"),
    count("product_id").alias("purchase_count"),
    avg("price").alias("avg_spent_per_item")
)

# Show top 10 customers by total spend
customer_spend.orderBy("total_spent", ascending=False).show(10)


+-----------+------------------+--------------+------------------+
|customer_id|       total_spent|purchase_count|avg_spent_per_item|
+-----------+------------------+--------------+------------------+
|      92892|11962.376586247785|            40|299.05941465619463|
|     182598|11734.744080147044|            40| 293.3686020036761|
|     423768|11640.544138920972|            41|283.91571070538953|
|     146391|11455.111367612324|            40| 286.3777841903081|
|      16667| 11356.46996053375|            38|  298.854472645625|
|     138550| 11314.97826394082|            36| 314.3049517761339|
|     211963|11208.079202264254|            39|287.38664621190395|
|      92659|  11197.7613731752|            33|339.32610221743033|
|     136212|11183.620709482999|            38| 294.3058081442894|
|     242070| 11154.96383273799|            36| 309.8601064649442|
+-----------+------------------+--------------+------------------+
only showing top 10 rows



In [30]:
# Group by product and category to find the most popular products
product_sales = df_spark.groupBy("product_id", "category").agg(
    sum("quantity").alias("total_sales"),
    sum("price").alias("total_revenue")
).orderBy("total_sales", ascending=False)

product_sales.show(10)


+----------+-----------+-----------+------------------+
|product_id|   category|total_sales|     total_revenue|
+----------+-----------+-----------+------------------+
|      8039|     Beauty|       1346|64675.532696662136|
|      2170|       Home|       1336| 64479.52902103612|
|      2403|      Books|       1326|  62125.4629707797|
|      9884|     Beauty|       1320|64735.875146384584|
|      1166|Electronics|       1320| 69041.05813436535|
|      3991|   Clothing|       1319|60501.681785976296|
|       624|      Books|       1312|   62133.655756085|
|       873|   Clothing|       1302| 63056.01226189673|
|      1984|     Beauty|       1295| 62769.96621621522|
|      4812|   Clothing|       1289| 59532.93994021196|
+----------+-----------+-----------+------------------+
only showing top 10 rows



In [31]:
from pyspark.sql.functions import hour

# Extract hour from timestamp
df_spark = df_spark.withColumn("hour", hour("timestamp"))

# Group by hour of day to find peak sales hours
sales_by_hour = df_spark.groupBy("hour").agg(
    sum("quantity").alias("total_sales"),
    sum("price").alias("total_revenue")
).orderBy("total_sales", ascending=False)

sales_by_hour.show()


+----+-----------+--------------------+
|hour|total_sales|       total_revenue|
+----+-----------+--------------------+
|  18|   49993455|2.5252163586712136E9|
+----+-----------+--------------------+



In [32]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# Prepare the data for ALS model
ratings_df = df_spark.select("customer_id", "product_id", "price").withColumnRenamed("price", "rating")

# Train a collaborative filtering model using ALS
als = ALS(userCol="customer_id", itemCol="product_id", ratingCol="rating", coldStartStrategy="drop")
model = als.fit(ratings_df)

# Make recommendations for all customers
recommendations = model.recommendForAllUsers(5)
recommendations.show()


+-----------+--------------------+
|customer_id|     recommendations|
+-----------+--------------------+
|          1|[{1820, 692.85736...|
|         12|[{5281, 634.515},...|
|         22|[{2187, 996.1738}...|
|         26|[{4699, 1074.7286...|
|         27|[{30, 756.7236}, ...|
|         28|[{6433, 733.2334}...|
|         31|[{6532, 1391.7119...|
|         34|[{591, 905.0293},...|
|         44|[{3387, 686.19855...|
|         47|[{9562, 1071.4802...|
|         53|[{9736, 1379.1587...|
|         65|[{3954, 1473.2566...|
|         76|[{2505, 1062.7957...|
|         78|[{8944, 1236.5356...|
|         81|[{278, 1429.5459}...|
|         85|[{2559, 918.63165...|
|         91|[{4283, 1200.3527...|
|         93|[{4610, 1492.9758...|
|        101|[{5447, 862.263},...|
|        103|[{2637, 618.62366...|
+-----------+--------------------+
only showing top 20 rows



In [33]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
predictions = model.transform(ratings_df)
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error (RMSE) for recommendation:", rmse)


Root-mean-square error (RMSE) for recommendation: 97.36442893415366
