#Download and install Apache Spark

In [1]:
!wget https://dlcdn.apache.org/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3.tgz
#unzip the compressed file
!tar xf spark-3.4.0-bin-hadoop3.tgz

--2023-06-05 07:54:05--  https://dlcdn.apache.org/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3.tgz
Resolving dlcdn.apache.org (dlcdn.apache.org)... 151.101.2.132, 2a04:4e42::644
Connecting to dlcdn.apache.org (dlcdn.apache.org)|151.101.2.132|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 388407094 (370M) [application/x-gzip]
Saving to: ‘spark-3.4.0-bin-hadoop3.tgz.1’


2023-06-05 07:54:09 (94.8 MB/s) - ‘spark-3.4.0-bin-hadoop3.tgz.1’ saved [388407094/388407094]



In [2]:
#install findspark
!pip install findspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [3]:
#Setup environment
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.0-bin-hadoop3"

In [4]:
#Initialize Spark
import findspark
findspark.init()
from pyspark.sql import SparkSession

In [5]:
#Create the SparkSession object
spark = SparkSession.builder \
    .appName("RandomUserDataFrame") \
    .getOrCreate()

#Generate a random user dataset

##User dataframe

In [6]:
from pyspark.sql.functions import rand, randn, when, floor, round
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
import random

def generate_users_dataset(nbr_users):
  # Define the user dataframe's schema
  schema = StructType([
      StructField('user_id', IntegerType(), True),
      StructField('user_age', IntegerType(), True),
      StructField('user_sex', StringType(), True),
      StructField('user_region', StringType(), True),
      StructField('user_income', IntegerType(), True),
  ])

  #Define the region data
  regions = ['East', 'West', 'North', 'South', 'Northeast', 'Southeast', 'Northwest', 'Southwest']

  # Generate user columns
  df = spark.range(0, nbr_users) \
      .withColumn('user_id', (floor(rand() * 90000000) + 10000000)) \
      .withColumn('user_age', (randn() * 10 + 30).cast(IntegerType())) \
      .withColumn('user_sex', when(rand() < 0.5, 'Male').otherwise('Female')) \
      .withColumn('user_region', when(rand() < 0.5, regions[0])
                            .when(rand() < 0.6, regions[1])
                            .when(rand() < 0.7, regions[2])
                            .when(rand() < 0.8, regions[3])
                            .when(rand() < 0.8, regions[4])
                            .when(rand() < 0.8, regions[5])
                            .when(rand() < 0.8, regions[6])
                            .otherwise(regions[7])) \
      .withColumn('user_income', (randn() * 10000 + 50000).cast(IntegerType())) \
      .select('user_id', 'user_age', 'user_sex', 'user_region','user_income')
  return df


user_dataframe = generate_users_dataset(10000)
user_dataframe.show()

+--------+--------+--------+-----------+-----------+
| user_id|user_age|user_sex|user_region|user_income|
+--------+--------+--------+-----------+-----------+
|55147647|      35|  Female|       West|      48445|
|84711057|      17|    Male|       East|      26028|
|52634332|      29|    Male|       East|      48815|
|73382092|      36|    Male|       West|      63689|
|92881442|      42|    Male|       East|      38200|
|26058944|      12|    Male|      North|      57054|
|12258011|      34|    Male|       East|      40727|
|60554025|      32|  Female|       East|      56925|
|65119333|      40|    Male|       West|      61973|
|89570373|      31|    Male|       East|      45085|
|10894949|      22|    Male|       East|      70978|
|51179911|      24|  Female|       East|      57207|
|29571465|      33|  Female|       West|      52364|
|97622045|      17|    Male|       East|      44039|
|73150237|      39|    Male|      North|      43168|
|23860223|      35|  Female|       East|      

##Item dataframe

In [7]:
def generate_items_dataset(nbr_items):
  # Define the item dataframe's schema
  item_schema = StructType([
      StructField('item_id', IntegerType(), True),
      StructField('item_category', StringType(), True),
      StructField('item_price', IntegerType(), True)
  ])

  #Define the category data
  categories = ['PC Hardware', 'PC Software', 'Console', 'Smartphone', 'Accessory', 'Other']

  # Generate item columns
  df = spark.range(0, nbr_items) \
      .withColumn('item_id', (floor(rand() * 90000000) + 10000000)) \
      .withColumn('item_category', when(rand() < 0.5, categories[0])
                            .when(rand() < 0.6, categories[1])
                            .when(rand() < 0.7, categories[2])
                            .when(rand() < 0.8, categories[3])
                            .when(rand() < 0.8, categories[4])
                            .otherwise(categories[5])) \
      .withColumn('item_price', round(rand() * (500 - 0.1) + 0.1, 2)) \
      .select('item_id', 'item_category', 'item_price')
  return df  

item_dataframe = generate_items_dataset(500)
item_dataframe.show()

+--------+-------------+----------+
| item_id|item_category|item_price|
+--------+-------------+----------+
|11778083|  PC Hardware|    348.99|
|85154949|  PC Hardware|    413.99|
|99065176|   Smartphone|    393.47|
|94953229|  PC Hardware|    251.35|
|71666950|  PC Hardware|    167.94|
|22360888|   Smartphone|     242.7|
|92978785|  PC Software|    451.82|
|82831742|  PC Software|    181.28|
|67110562|  PC Hardware|    160.55|
|75985073|      Console|    345.29|
|13198260|  PC Hardware|    359.16|
|79672484|  PC Software|    147.44|
|53489111|  PC Hardware|    272.76|
|91391541|  PC Hardware|    208.82|
|52708859|  PC Software|    333.82|
|43320132|      Console|    483.38|
|46289723|  PC Hardware|    157.55|
|53616257|  PC Software|     68.31|
|76472959|  PC Hardware|     79.71|
|36575806|  PC Software|    111.46|
+--------+-------------+----------+
only showing top 20 rows



##user-item interaction dataframe

In [8]:
interaction_df = user_dataframe.crossJoin(item_dataframe).sample(False, 0.2)
interaction_df.show()

+--------+--------+--------+-----------+-----------+--------+-------------+----------+
| user_id|user_age|user_sex|user_region|user_income| item_id|item_category|item_price|
+--------+--------+--------+-----------+-----------+--------+-------------+----------+
|55147647|      35|  Female|       West|      48445|11778083|  PC Hardware|    348.99|
|55147647|      35|  Female|       West|      48445|22360888|   Smartphone|     242.7|
|55147647|      35|  Female|       West|      48445|43320132|      Console|    483.38|
|55147647|      35|  Female|       West|      48445|53616257|  PC Software|     68.31|
|55147647|      35|  Female|       West|      48445|94270929|  PC Hardware|    265.04|
|55147647|      35|  Female|       West|      48445|24560422|      Console|    111.32|
|55147647|      35|  Female|       West|      48445|43875032|      Console|    192.47|
|55147647|      35|  Female|       West|      48445|77928476|  PC Software|     39.09|
|55147647|      35|  Female|       West|   

##Export the dataframe into csv file

In [11]:
interaction_df.coalesce(1).write.option("header", "false").csv("user_item_interaction.csv")