In [1]:
# Install PySpark using pip
# !pip install pyspark

## Setting Up the Environment

In [3]:
from pyspark.sql import SparkSession

In [4]:
import os

# Set environment variables for PySpark
os.environ['PYSPARK_PYTHON'] = 'python'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'python'

In [5]:
# Initialize Spark Session in local mode
spark = SparkSession.builder \
    .appName("PySpark Learning") \
    .master("local[*]") \
    .config("spark.network.timeout", "600s") \
    .config("spark.executor.heartbeatInterval", "60s") \
    .getOrCreate()

# Verify the Spark Session
print("Spark Version:", spark.version)

Spark Version: 3.5.1


## Working with RDDs

RDDs are the low-level data structures in Spark. Let's create and perform operations on RDDs.

In [8]:
# Create an RDD
data = [1, 2, 3, 4, 5]
rdd = spark.sparkContext.parallelize(data)

# Perform transformations and actions
rdd_squared = rdd.map(lambda x: x * x)
print(rdd_squared.collect())

[1, 4, 9, 16, 25]


## Creating and Manipulating DataFrames

In [16]:
from pyspark import Row

In [22]:
# create dataframe
data = [("Datta", 25), ("Poonam", 29), ("Jyoti", 31), ("Swati", 32), ("Nilam", 33)]
columns = ["Name", "Age"]

df = spark.createDataFrame(data, columns)
df.show()

+------+---+
|  Name|Age|
+------+---+
| Datta| 25|
|Poonam| 29|
| Jyoti| 31|
| Swati| 32|
| Nilam| 33|
+------+---+



In [24]:
# Perform transformations
df_filtered = df.filter(df["Age"] > 30)
df_filtered.show()

+-----+---+
| Name|Age|
+-----+---+
|Jyoti| 31|
|Swati| 32|
|Nilam| 33|
+-----+---+



In [26]:
# Group and aggregate
df_grouped = df.groupBy("Age").count()
df_grouped.show()

+---+-----+
|Age|count|
+---+-----+
| 25|    1|
| 29|    1|
| 31|    1|
| 32|    1|
| 33|    1|
+---+-----+



## Reading Data from a CSV File

In [29]:
df_csv = spark.read.csv(r"C:\Users\datta\OneDrive\Desktop\Predixion.ai Data Engineer Assignment\5k_borrowers_data.csv", 
                        header=True, 
                        inferSchema=True)

In [42]:
df_csv.show()

+-------------------+--------------------+----------+--------------+-------------+--------------------+--------------------+-------------------+---------------------+----------------+---------------+--------------------+--------------------+--------------------+-----------------+--------+---------------+--------------------+--------------------+----------------------------+---------------+
|               Name|       Date of Birth|    Gender|Marital Status| Phone Number|       Email Address|     Mailing Address|Language Preference|Geographical Location|    Credit Score|      Loan Type|         Loan Amount|           Loan Term|       Interest Rate|     Loan Purpose|     EMI|     IP Address|         Geolocation|   Repayment History|Days Left to Pay Current EMI|Delayed Payment|
+-------------------+--------------------+----------+--------------+-------------+--------------------+--------------------+-------------------+---------------------+----------------+---------------+---------------

In [39]:
# pd_df= df_csv.toPandas()
# pd_df

## Using Spark SQL

In [45]:
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

# Run SQL queries
sql_df = spark.sql("SELECT Name, Age FROM people WHERE Age > 30")
sql_df.show()

+-----+---+
| Name|Age|
+-----+---+
|Jyoti| 31|
|Swati| 32|
|Nilam| 33|
+-----+---+



## Machine Learning with PySpark

In [52]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

# Prepare a larger dataset
data = [(i, 2.0 * i) for i in range(1, 101)]
columns = ["x", "y"]
df_ml = spark.createDataFrame(data, columns)

# Create feature vector
assembler = VectorAssembler(inputCols=["x"], outputCol="features")
df_ml = assembler.transform(df_ml)

# Split the data
train_data, test_data = df_ml.randomSplit([0.7, 0.3])

# Check the counts of train and test datasets
print("Training Data Count: ", train_data.count())
print("Test Data Count: ", test_data.count())

# Create and train the model
lr = LinearRegression(featuresCol="features", labelCol="y")
lr_model = lr.fit(train_data)

# Evaluate the model
predictions = lr_model.transform(test_data)
predictions.select("features", "y", "prediction").show()


Training Data Count:  73
Test Data Count:  27
+--------+-----+----------+
|features|    y|prediction|
+--------+-----+----------+
|   [2.0]|  4.0|       4.0|
|   [5.0]| 10.0|      10.0|
|  [17.0]| 34.0|      34.0|
|  [19.0]| 38.0|      38.0|
|  [22.0]| 44.0|      44.0|
|  [23.0]| 46.0|      46.0|
|  [24.0]| 48.0|      48.0|
|  [27.0]| 54.0|      54.0|
|  [32.0]| 64.0|      64.0|
|  [39.0]| 78.0|      78.0|
|  [50.0]|100.0|     100.0|
|  [51.0]|102.0|     102.0|
|  [52.0]|104.0|     104.0|
|  [54.0]|108.0|     108.0|
|  [58.0]|116.0|     116.0|
|  [65.0]|130.0|     130.0|
|  [67.0]|134.0|     134.0|
|  [69.0]|138.0|     138.0|
|  [71.0]|142.0|     142.0|
|  [72.0]|144.0|     144.0|
+--------+-----+----------+
only showing top 20 rows

