<a href="https://colab.research.google.com/github/CodingErik/jupyter-notebook-pyspark/blob/main/main.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark




In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rand, lit

# Initialize Spark Session
spark = SparkSession.builder.appName("BigDataTest").getOrCreate()

# Generate a large dataset (1M rows)
num_rows = 10**6  # 1 million rows
data = [(i, f"User_{i}") for i in range(num_rows)]

# Creating another dataset (Orders)
order_data = [(i, f"Product_{i%10}", i*10) for i in range(500_000, 1_000_000)]  # Orders from 500K to 1M

# creating our dataframes
userDF = spark.createDataFrame(data, ["ID", "Name"])
orderDF = spark.createDataFrame(order_data, ["UserID", "Product", "Amount"])

# Add a random Age column (0-99) and also adding a Country column with a USA constant
userDF = userDF.withColumn("Age", (rand(seed=42) * 100).cast("int"))
userDF = userDF.withColumn("Country", lit("USA"))

# Show some sample data
# df.show(5)



In [19]:
userDF.show(5)

userDF.withColumnRenamed()

+---+------+---+-------+
| ID|  Name|Age|Country|
+---+------+---+-------+
|  0|User_0| 61|    USA|
|  1|User_1| 50|    USA|
|  2|User_2| 83|    USA|
|  3|User_3| 26|    USA|
|  4|User_4| 67|    USA|
+---+------+---+-------+
only showing top 5 rows



In [20]:
orderDF.show(5)

+------+---------+-------+
|UserID|  Product| Amount|
+------+---------+-------+
|500000|Product_0|5000000|
|500001|Product_1|5000010|
|500002|Product_2|5000020|
|500003|Product_3|5000030|
|500004|Product_4|5000040|
+------+---------+-------+
only showing top 5 rows



In [22]:


inner_join_df = userDF.join(orderDF, userDF.ID == orderDF.UserID, "inner")

filtered_inner_join_df = inner_join_df.select(userDF.ID, userDF.Name, orderDF.Product, orderDF.Amount).show(5)
# inner_join_df.show(10)

+------+-----------+---------+-------+
|    ID|       Name|  Product| Amount|
+------+-----------+---------+-------+
|500001|User_500001|Product_1|5000010|
|500003|User_500003|Product_3|5000030|
|500004|User_500004|Product_4|5000040|
|500005|User_500005|Product_5|5000050|
|500006|User_500006|Product_6|5000060|
+------+-----------+---------+-------+
only showing top 5 rows



In [None]:
nums = list(range(0, 1000001))
len(nums)

nums_rdd = spark.sparkContext.parallelize(nums)
nums_rdd.limit(10).collect() # it's distributing the list, and this is our driver program

In [24]:
from pyspark.sql.functions import col, rand, floor, concat, lit, substring

# Number of records
num_practitioners = 500_000  # Half a million practitioners
num_pracRoles = 1_000_000  # One million practitioner roles
num_locations = 100_000  # 100K locations

# Practitioner DataFrame (500K rows)
practitionerDF = spark.range(num_practitioners).toDF("PractitionerID") \
    .withColumn("Name", concat(lit("Dr. "), col("PractitionerID").cast("string"))) \
    .withColumn("Specialty", concat(lit("Specialty_"), (floor(rand(seed=42) * 10)).cast("string")))

# Location DataFrame (100K rows)
locationDF = spark.range(num_locations).toDF("LocationID") \
    .withColumn("FacilityName", concat(lit("Clinic_"), col("LocationID").cast("string"))) \
    .withColumn("City", concat(lit("City_"), (floor(rand(seed=99) * 50)).cast("string")))

# PractitionerRole DataFrame (1M rows) - Driving Table
pracRoleDF = spark.range(num_pracRoles).toDF("RoleID") \
    .withColumn("PractitionerID", (floor(rand(seed=21) * num_practitioners)).cast("int")) \
    .withColumn("LocationID", (floor(rand(seed=45) * num_locations)).cast("int")) \
    .withColumn("Role", concat(lit("Role_"), (floor(rand(seed=15) * 5)).cast("string")))



In [None]:
# tuning cache behavior
spark.conf.set("spark.sql.inMemoryColumnarStorage.compressed", True)  # ✅ Enables compression
spark.conf.set("spark.sql.inMemoryColumnarStorage.batchSize", 10000)  # ✅ Adjust batch size for caching

In [None]:
spark.conf.get("spark.memory.fraction")  # Default is 0.6 (60% of JVM heap)
spark.conf.get("spark.memory.storageFraction")  # Default is 0.5 (50% of execution memory)

In [None]:
# check how a df is cached
print(pracRoleDF.storageLevel)

In [31]:
print(type(spark))  # <class 'pyspark.sql.session.SparkSession'>
print(type(pracRoleDF))  # <class 'pyspark.sql.dataframe.DataFrame'>
print(type(pracRoleDF.RoleID))  # <class 'pyspark.sql.column.Column'>


<class 'pyspark.sql.session.SparkSession'>
<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pyspark.sql.column.Column'>


In [29]:
practitionerDF.cache()
locationDF.cache()
pracRoleDF.cache()

DataFrame[RoleID: bigint, PractitionerID: int, LocationID: int, Role: string]

In [30]:
practitionerDF.is_cached
locationDF.is_cached
pracRoleDF.is_cached

True

In [None]:
# Show a sample
print("Practitioner Sample:")
practitionerDF.show(5)

print("Location Sample:")
locationDF.show(5)

print("PractitionerRole Sample (Driving Table):")
pracRoleDF.show(5)


In [28]:
joined_large_df = (
    pracRoleDF
    .join(practitionerDF, "PractitionerID", "inner")
    .join(locationDF, "LocationID", "left")
    .select(
        pracRoleDF.RoleID.alias("lito"),
        pracRoleDF.PractitionerID,
        practitionerDF.Name,
        practitionerDF.Specialty,
        locationDF.FacilityName,
        locationDF.City
    )
)
# Show a small sample from the massive join
joined_large_df.show(10)


+------+--------------+------+-----------+------------+-------+
|RoleID|PractitionerID|  Name|  Specialty|FacilityName|   City|
+------+--------------+------+-----------+------------+-------+
|160537|             5| Dr. 5|Specialty_5|Clinic_92674|City_31|
|414712|             5| Dr. 5|Specialty_5|Clinic_59696|City_36|
|470600|             5| Dr. 5|Specialty_5|Clinic_10366| City_2|
|620430|             5| Dr. 5|Specialty_5|Clinic_15243|City_14|
|970123|             5| Dr. 5|Specialty_5|Clinic_29953| City_3|
|972503|             5| Dr. 5|Specialty_5|Clinic_10502|City_42|
|363365|             7| Dr. 7|Specialty_0|Clinic_14545|City_21|
|183093|             9| Dr. 9|Specialty_7|Clinic_69161|City_37|
| 39954|            10|Dr. 10|Specialty_4|Clinic_45239|City_25|
|344445|            10|Dr. 10|Specialty_4|Clinic_21695| City_5|
+------+--------------+------+-----------+------------+-------+
only showing top 10 rows

