# Setting Up PySpark

In [1]:
# Install PySpark using pip
!pip install pyspark

# Set environment variables for Spark
!export SPARK_HOME=/path/to/spark
!export PATH=$SPARK_HOME/bin:$PATH
!export PYSPARK_PYTHON=python3

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=3e9457998d6588df1dade26c77f847cf8ecc0e1194d30bc732ba54de23849770
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


# Basic Operations and Transformations

## Creating a Spark Session:



In [3]:
from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder.appName("BasicOperations").getOrCreate()

## Loading Data:

In [4]:
# Load the CSV file into a DataFrame
df = spark.read.csv("dataset.csv", header=True, inferSchema=True)

## DataFrame Operations:

In [5]:
# Show the first 5 rows
df.show(5)

# Print schema of the DataFrame
df.printSchema()

# Select specific columns
df.select("transaction_id", "customer_id").show()

# Filter rows
df.filter(df["quantity"] > 1).show()

# Group by and aggregate
df.groupBy("product_id").count().show()

+--------------+-----------+----------------+----------+--------+-----+
|transaction_id|customer_id|transaction_date|product_id|quantity|price|
+--------------+-----------+----------------+----------+--------+-----+
|             1|       1001|      2023-01-01|      2001|       2| 20.0|
|             2|       1002|      2023-01-02|      2003|       1| 15.0|
|             3|       1001|      2023-01-03|      2002|       3| 10.0|
|             4|       1003|      2023-01-04|      2001|       1| 20.0|
|             5|       1002|      2023-01-05|      2003|       2| 15.0|
+--------------+-----------+----------------+----------+--------+-----+

root
 |-- transaction_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- transaction_date: date (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)

+--------------+-----------+
|transaction_id|customer_id|
+--------------+-----------+


## DataFrame Transformations:

In [6]:
# Add a new column for total amount
df = df.withColumn("total_amount", df["quantity"] * df["price"])

# Drop a column
df = df.drop("product_id")

# Rename a column
df = df.withColumnRenamed("transaction_date", "date")

# Machine Learning with PySpark

In [7]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

# Feature engineering
assembler = VectorAssembler(inputCols=["quantity", "price"], outputCol="features")
data = assembler.transform(df)

# Split the data into training and test sets
train_data, test_data = data.randomSplit([0.8, 0.2])

# Initialize a linear regression model
lr = LinearRegression(featuresCol="features", labelCol="total_amount")

# Fit the model on training data
lr_model = lr.fit(train_data)

# Predict on test data
predictions = lr_model.transform(test_data)
predictions.select("total_amount", "prediction").show()

+------------+------------------+
|total_amount|        prediction|
+------------+------------------+
|        15.0| 5.000000000000348|
|        30.0|25.000000000000107|
+------------+------------------+

