In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, StringType, StructType, StructField
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import random

### from pyspark.sql import SparkSession

What It Is: This imports the SparkSession class from the pyspark.sql module.



Purpose: SparkSession is the entry point for using Spark functionality. It allows you to create DataFrames, execute SQL queries, and access other Spark functionalities. It replaces the older SparkContext and SQLContext in newer versions of PySpark.


### from pyspark.sql import functions as F

What It Is: This imports the functions module from pyspark.sql and gives it the alias F.



Purpose: The functions module contains many built-in functions for performing operations on DataFrames, such as F.col(), 

#### F.lit(), F.concat(), and others. Using the alias F makes the code cleaner and shorter.

    F.lit(value)

        What It Is: Creates a column containing a constant value.

        
        Purpose: Used to add a fixed value as a new column or in operations with other columns.
            
    
    F.col(column_name)

        What It Is: Refers to an existing column in a DataFrame.


        Purpose: Allows you to perform operations or transformations on that specific column.


    F.concat(*cols)

        What It Is: Combines multiple columns into a single column of strings.


        Purpose: Used to merge values from different columns into one column, typically for concatenating text data.

### from pyspark.sql.types import IntegerType, StringType, StructType, StructField

What It Is: This imports data type classes and schema definition classes from pyspark.sql.types.


Purpose:


IntegerType: Defines a column of integer type in a DataFrame schema.


StringType: Defines a column of string type in a DataFrame schema.


StructType: Represents a schema for a DataFrame, where you can define the structure of the DataFrame by specifying the types of each column.


StructField: Defines a single field (column) in a StructType, including its name and type.
### from pyspark.ml.feature import VectorAssembler, StringIndexer

What It Is: This imports classes from the pyspark.ml.feature module.
Purpose:


VectorAssembler: Combines multiple columns into a single vector column, which is often required for machine learning algorithms that expect features in a vector format.


StringIndexer: Converts categorical string columns into numerical indices, which is useful for algorithms that require numeric inputs.
### from pyspark.ml.classification import RandomForestClassifier

What It Is: This imports the RandomForestClassifier class from the pyspark.ml.classification module.


Purpose: RandomForestClassifier is a machine learning algorithm for classification tasks. It builds multiple decision trees and combines their results to improve accuracy and control over-fitting.
### from pyspark.ml.evaluation import MulticlassClassificationEvaluator

What It Is: This imports the MulticlassClassificationEvaluator class from the pyspark.ml.evaluation module.


Purpose: MulticlassClassificationEvaluator is used to evaluate the performance of a multiclass classification model. It provides metrics such as accuracy, precision, recall, and F1 score.
### import random

What It Is: This imports the random module from the Python standard library.


Purpose: The random module provides functions to generate random numbers, perform random selections, and shuffle sequences. It is often used for tasks such as data sampling, splitting datasets, or adding randomness to experiments.

In [22]:
# Create a Spark session
# This initializes a Spark application with the name "PySpark Example"
spark = SparkSession.builder.appName("PySpark Example").getOrCreate()

# Define a schema for the DataFrame
# The schema specifies the column names and data types for the DataFrame
schema = StructType([
    StructField("Name", StringType(), True),       # Column for names (strings), nullable
    StructField("Age", IntegerType(), True),        # Column for age (integers), nullable
    StructField("Country", StringType(), True),     # Column for country (strings), nullable
    StructField("Score", IntegerType(), True),      # Column for score (integers), nullable
    StructField("Label", IntegerType(), True)       # Column for label (integers), nullable (used for classification)
])

# Generate a large dataset (100,000 rows) with random data
# The data consists of random values for each of the columns defined in the schema
names = ["Alice", "Bob", "Cathy", "David", "Eva", "Frank", "Grace", "Helen"]  # Possible names
countries = ["USA", "Canada", "UK", "Germany", "France", "India", "China", "Japan"]  # Possible countries

# Create a list of tuples with random data
# Each tuple represents a row in the dataset
data = [(random.choice(names),                         # Random name
         random.randint(18, 60),                      # Random age between 18 and 60
         random.choice(countries),                    # Random country
         random.randint(50, 100),                     # Random score between 50 and 100
         1 if random.randint(50, 100) > 80 else 0)    # Label: 1 if score > 80, otherwise 0
        for _ in range(100000)]                        # Generate 100,000 such tuples

# Create a DataFrame with the generated data and defined schema
# This converts the list of tuples into a Spark DataFrame using the schema
df = spark.createDataFrame(data, schema)

# Show the first 5 rows of the DataFrame
# This displays a sample of the data to the console for verification
df.show(5)

+-----+---+-------+-----+-----+
| Name|Age|Country|Score|Label|
+-----+---+-------+-----+-----+
|  Bob| 35| France|   87|    0|
|  Bob| 18|  India|   82|    0|
|Alice| 30|  Japan|   72|    0|
|Grace| 52|  India|   79|    0|
|David| 30|  China|   88|    0|
+-----+---+-------+-----+-----+
only showing top 5 rows



In [5]:
# Get the number of rows in the DataFrame
row_count = df.count()
print(f"Number of rows: {row_count}")




Number of rows: 100000


                                                                                

In [6]:
# Select specific columns
selected_df = df.select("Name", "Country", "Score")
selected_df.show(5)


+-----+-------+-----+
| Name|Country|Score|
+-----+-------+-----+
|Frank|     UK|   81|
|Cathy|  China|   90|
|David| Canada|   93|
|Frank| Canada|   87|
|Frank|     UK|   82|
+-----+-------+-----+
only showing top 5 rows



In [7]:
# Filter the DataFrame where Score > 80
filtered_df = df.filter(df["Score"] > 80)
filtered_df.show(5)

+-----+---+-------+-----+-----+
| Name|Age|Country|Score|Label|
+-----+---+-------+-----+-----+
|Frank| 55|     UK|   81|    1|
|Cathy| 51|  China|   90|    0|
|David| 59| Canada|   93|    0|
|Frank| 32| Canada|   87|    0|
|Frank| 32|     UK|   82|    0|
+-----+---+-------+-----+-----+
only showing top 5 rows



In [8]:
# Group by "Country" column and calculate the average of the "Score" column
grouped_df = df.groupBy("Country").agg(F.avg("Score").alias("Average_Score"))

# Show the resulting DataFrame with the average scores for each country
grouped_df.show()


[Stage 6:===>                                                     (1 + 15) / 16]

+-------+-----------------+
|Country|    Average_Score|
+-------+-----------------+
|Germany|74.91144501278772|
| France|74.85205479452055|
|  China|74.86080643888582|
|  India|74.89501438159157|
|    USA|75.02417353718081|
|     UK|75.24174222797927|
| Canada|75.14404476418865|
|  Japan|75.16355513004628|
+-------+-----------------+



                                                                                

In [9]:
# Add a new column named 'New_Score' to the DataFrame
# The value of 'New_Score' is calculated by adding 10 to the existing 'Score' column
transformed_df = df.withColumn("New_Score", df["Score"] + 10)

# Display the first 5 rows of the DataFrame with the new column added
transformed_df.show(5)

+-----+---+-------+-----+-----+---------+
| Name|Age|Country|Score|Label|New_Score|
+-----+---+-------+-----+-----+---------+
|Frank| 55|     UK|   81|    1|       91|
|Cathy| 51|  China|   90|    0|      100|
|David| 59| Canada|   93|    0|      103|
|Frank| 32| Canada|   87|    0|       97|
|Frank| 32|     UK|   82|    0|       92|
+-----+---+-------+-----+-----+---------+
only showing top 5 rows



In [10]:
# Sort the DataFrame by the 'Score' column in descending order
# The 'orderBy' function is used to sort the DataFrame based on the specified column(s)
# 'df["Score"].desc()' indicates that the sorting should be in descending order
sorted_df = df.orderBy(df["Score"].desc())

# Display the first 5 rows of the sorted DataFrame
# This shows the top 5 rows after sorting, with the highest 'Score' values appearing first
sorted_df.show(5)




+-----+---+-------+-----+-----+
| Name|Age|Country|Score|Label|
+-----+---+-------+-----+-----+
|  Bob| 35|     UK|  100|    0|
|Cathy| 37|  Japan|  100|    0|
|Frank| 32|  China|  100|    1|
|David| 24|     UK|  100|    0|
|  Eva| 57|  China|  100|    0|
+-----+---+-------+-----+-----+
only showing top 5 rows



                                                                                

In [11]:

# Drop rows with missing values (if any)
cleaned_df = df.dropna()
cleaned_df.show(5)

+-----+---+-------+-----+-----+
| Name|Age|Country|Score|Label|
+-----+---+-------+-----+-----+
|Frank| 55|     UK|   81|    1|
|Cathy| 51|  China|   90|    0|
|David| 59| Canada|   93|    0|
|Frank| 32| Canada|   87|    0|
|Frank| 32|     UK|   82|    0|
+-----+---+-------+-----+-----+
only showing top 5 rows



In [12]:

# Write the DataFrame to a CSV file
cleaned_df.write.csv("output_dataset.csv", header=True)

                                                                                

In [23]:
# Prepare the data for machine learning by converting categorical values to numeric values

# Create a StringIndexer object
# This will map categorical values in the 'Country' column to numeric indices
indexer = StringIndexer(inputCol="Country", outputCol="CountryIndex")

# Fit the StringIndexer model to the DataFrame and transform the data
# The 'fit' method learns the mapping from categorical values to numeric indices
# The 'transform' method applies this mapping to create a new column 'CountryIndex' in the DataFrame
df_indexed = indexer.fit(df).transform(df)

                                                                                

In [14]:
# Create a VectorAssembler
# inputCols: List of columns to combine into one vector
# outputCol: Name of the new column that will store the feature vectors
assembler = VectorAssembler(inputCols=["Age", "CountryIndex", "Score"], outputCol="features")

# Transform the DataFrame
# The transform method combines the 'Age', 'CountryIndex', and 'Score' columns into a single vector column named 'features'
df_features = assembler.transform(df)

# Display the resulting DataFrame to see the new 'features' column
df_features.show()

In [15]:
# Check if 'Label' column exists in df_features
print("Columns in DataFrame:", df_features.columns)


Columns in DataFrame: ['Name', 'Age', 'Country', 'Score', 'Label', 'CountryIndex', 'features']


In [16]:

# Split the data into training and test sets
(training_data, test_data) = df_features.randomSplit([0.8, 0.2], seed=1234)

In [17]:
# Initialize the RandomForestClassifier
rf = RandomForestClassifier(labelCol="Label", featuresCol="features", numTrees=10)

In [18]:
# Train the model
model = rf.fit(training_data)

                                                                                

In [19]:
# Make predictions
predictions = model.transform(test_data)

In [20]:
# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="Label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")




Accuracy: 0.6042640921204807


                                                                                

In [21]:
# Show some prediction results
predictions.select("Name", "Age", "Country", "Score", "Label", "prediction").show(5)

+-----+---+-------+-----+-----+----------+
| Name|Age|Country|Score|Label|prediction|
+-----+---+-------+-----+-----+----------+
|Alice| 18| Canada|   69|    1|       0.0|
|Alice| 18| France|   59|    1|       0.0|
|Alice| 19|  India|   64|    1|       0.0|
|Alice| 19|  Japan|   62|    1|       0.0|
|Alice| 19|  Japan|   80|    0|       0.0|
+-----+---+-------+-----+-----+----------+
only showing top 5 rows



In [None]:




















# Stop the Spark session
spark.stop()