# **Importing Libraries and Data**

In [None]:
!pip install pyspark


Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285387 sha256=bf22ed024338d0d0d192a43fbe45762e6815d46b75ccbe00a2cd246c1ccb5dfc
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
spark = SparkSession.builder.appName("playstore").getOrCreate()


In [None]:
data=spark.read.option('header','true').csv('/content/drive/MyDrive/big data and iot/Dataset/Playstore_final.csv')

In [None]:
data.show()

+--------------------+--------------------+----------------+------------+--------------------+-------------------+----------------+----+-----+--------+----+---------------+-------------------+--------------------+--------------------+------------+-----------------+--------------------+--------------+------------+----------------+-------------+--------------------+-------+--------------------+------------+--------------------+---------------------+-------+
|            App Name|              App Id|        Category|      Rating|        Rating Count|           Installs|Minimum Installs|Free|Price|Currency|Size|Minimum Android|       Developer Id|   Developer Website|     Developer Email|    Released|      Last update|      Privacy Policy|Content Rating|Ad Supported|In app purchases|Editor Choice|             Summary|Reviews|Android version Text|   Developer|   Developer Address|Developer Internal ID|Version|
+--------------------+--------------------+----------------+------------+-------

In [None]:
# drop coloums(App id, Developer id,Developer Website,Released,Last update,Summary,Developer Address,Developer Internal ID)
data_drop=data.drop('App Id','Developer Id','Developer Website','Developer','Developer Email','Version','Privacy Policy','Released','Version','Android version Text','Last update','Summary','Developer Address','Developer Internal ID')

In [None]:
data_drop.show()

+--------------------+----------------+------------+--------------------+-------------------+----------------+----+-----+--------+----+---------------+--------------+------------+----------------+-------------+-------+
|            App Name|        Category|      Rating|        Rating Count|           Installs|Minimum Installs|Free|Price|Currency|Size|Minimum Android|Content Rating|Ad Supported|In app purchases|Editor Choice|Reviews|
+--------------------+----------------+------------+--------------------+-------------------+----------------+----+-----+--------+----+---------------+--------------+------------+----------------+-------------+-------+
|Logistics Management|       Education|    4.090909|                  66|            10,000+|           10000|TRUE|    0|     USD|5.8M|              4|      Everyone|        TRUE|           FALSE|        FALSE|     28|
|Estados Unidos No...|News & Magazines|           4|                   8|             1,000+|            1000|TRUE|    0|   

# **Dataset Preprocessing**

In [None]:
from pyspark.sql.functions import col, sum

# Count null values in each column
for column in data_drop.columns:
    null_count = data.filter(col(column).isNull()).count()
    print(f"Column '{column}': {null_count} null values")

Column 'App Name': 41 null values
Column 'Category': 139023 null values
Column 'Rating': 225328 null values
Column 'Rating Count': 255566 null values
Column 'Installs': 261580 null values
Column 'Minimum Installs': 262590 null values
Column 'Free': 268718 null values
Column 'Price': 269552 null values
Column 'Currency': 269792 null values
Column 'Size': 269925 null values
Column 'Minimum Android': 270003 null values
Column 'Content Rating': 270016 null values
Column 'Ad Supported': 270016 null values
Column 'In app purchases': 270016 null values
Column 'Editor Choice': 270016 null values
Column 'Reviews': 278787 null values


In [None]:
# Drop rows with null values
data_null_removed = data_drop.dropna()

# Verify the null value counts after dropping rows
for column in data.columns:
    null_count = data_null_removed.filter(col(column).isNull()).count()
    print(f"Column '{column}': {null_count} null values")


Column 'App Name': 0 null values
Column 'App Id': 0 null values
Column 'Category': 0 null values
Column 'Rating': 0 null values
Column 'Rating Count': 0 null values
Column 'Installs': 0 null values
Column 'Minimum Installs': 0 null values
Column 'Free': 0 null values
Column 'Price': 0 null values
Column 'Currency': 0 null values
Column 'Size': 0 null values
Column 'Minimum Android': 0 null values
Column 'Developer Id': 2 null values
Column 'Developer Website': 0 null values
Column 'Developer Email': 0 null values
Column 'Released': 0 null values
Column 'Last update': 0 null values
Column 'Privacy Policy': 0 null values
Column 'Content Rating': 0 null values
Column 'Ad Supported': 0 null values
Column 'In app purchases': 0 null values
Column 'Editor Choice': 0 null values
Column 'Summary': 186 null values
Column 'Reviews': 0 null values
Column 'Android version Text': 1078 null values
Column 'Developer': 30 null values
Column 'Developer Address': 32 null values
Column 'Developer Internal

In [None]:
data_null_removed.show()

+--------------------+----------------+---------+------------+----------+----------------+-----+-----+--------+----+---------------+--------------+------------+----------------+-------------+-------+
|            App Name|        Category|   Rating|Rating Count|  Installs|Minimum Installs| Free|Price|Currency|Size|Minimum Android|Content Rating|Ad Supported|In app purchases|Editor Choice|Reviews|
+--------------------+----------------+---------+------------+----------+----------------+-----+-----+--------+----+---------------+--------------+------------+----------------+-------------+-------+
|Logistics Management|       Education| 4.090909|          66|   10,000+|           10000| TRUE|    0|     USD|5.8M|              4|      Everyone|        TRUE|           FALSE|        FALSE|     28|
|    Dental Assistant|       Education|3.8666666|          15|   10,000+|           10000| TRUE|    0|     USD|5.7M|              4|      Everyone|        TRUE|           FALSE|        FALSE|      3|


**Remove Installs rowa + and ,**

In [None]:
from pyspark.sql.functions import col, regexp_replace

# Assuming 'data' is your PySpark DataFrame and 'Installs' is the column you want to clean
data_regexp_replace = data_null_removed.withColumn("Installs", regexp_replace(col("Installs"), "[+,]", ""))
data_regexp_replace.show()


+--------------------+----------------+---------+------------+--------+----------------+-----+-----+--------+----+---------------+--------------+------------+----------------+-------------+-------+
|            App Name|        Category|   Rating|Rating Count|Installs|Minimum Installs| Free|Price|Currency|Size|Minimum Android|Content Rating|Ad Supported|In app purchases|Editor Choice|Reviews|
+--------------------+----------------+---------+------------+--------+----------------+-----+-----+--------+----+---------------+--------------+------------+----------------+-------------+-------+
|Logistics Management|       Education| 4.090909|          66|   10000|           10000| TRUE|    0|     USD|5.8M|              4|      Everyone|        TRUE|           FALSE|        FALSE|     28|
|    Dental Assistant|       Education|3.8666666|          15|   10000|           10000| TRUE|    0|     USD|5.7M|              4|      Everyone|        TRUE|           FALSE|        FALSE|      3|
|   Medica

**Remove size coloumn M form the data**

In [None]:
from pyspark.sql.functions import col, regexp_replace

# Assuming 'data' is your PySpark DataFrame and 'Size' is the column you want to clean
data_regexp_replace2 = data_regexp_replace.withColumn("Size", regexp_replace(col("Size"), "M", ""))

data_regexp_replace2.show()

+--------------------+----------------+---------+------------+--------+----------------+-----+-----+--------+----+---------------+--------------+------------+----------------+-------------+-------+
|            App Name|        Category|   Rating|Rating Count|Installs|Minimum Installs| Free|Price|Currency|Size|Minimum Android|Content Rating|Ad Supported|In app purchases|Editor Choice|Reviews|
+--------------------+----------------+---------+------------+--------+----------------+-----+-----+--------+----+---------------+--------------+------------+----------------+-------------+-------+
|Logistics Management|       Education| 4.090909|          66|   10000|           10000| TRUE|    0|     USD| 5.8|              4|      Everyone|        TRUE|           FALSE|        FALSE|     28|
|    Dental Assistant|       Education|3.8666666|          15|   10000|           10000| TRUE|    0|     USD| 5.7|              4|      Everyone|        TRUE|           FALSE|        FALSE|      3|
|   Medica

**Remove rows with "N/A"**

In [None]:
from pyspark.sql.functions import col

# Assuming 'data_regexp_replace2' is your PySpark DataFrame
data_remove_nan = data_regexp_replace2

# Iterate through all columns and remove rows with "N/A"
for column_name in data_remove_nan.columns:
    data_remove_nan = data_remove_nan.filter((col(column_name) != "N/A") & (col(column_name) != " N/A"))

In [None]:
data_remove_nan.show()

+--------------------+----------------+---------+------------+--------+----------------+----+-----+--------+----+---------------+--------------+------------+----------------+-------------+-------+
|            App Name|        Category|   Rating|Rating Count|Installs|Minimum Installs|Free|Price|Currency|Size|Minimum Android|Content Rating|Ad Supported|In app purchases|Editor Choice|Reviews|
+--------------------+----------------+---------+------------+--------+----------------+----+-----+--------+----+---------------+--------------+------------+----------------+-------------+-------+
|Logistics Management|       Education| 4.090909|          66|   10000|           10000|TRUE|    0|     USD| 5.8|              4|      Everyone|        TRUE|           FALSE|        FALSE|     28|
|    Dental Assistant|       Education|3.8666666|          15|   10000|           10000|TRUE|    0|     USD| 5.7|              4|      Everyone|        TRUE|           FALSE|        FALSE|      3|
|   Medical Ass

**Show the counts of "N/A" in each column**

In [None]:
from pyspark.sql.functions import col, sum, when

# Assuming 'data_remove_nan' is your PySpark DataFrame
na_counts = data_remove_nan.select([sum(when(col(c) == "N/A", 1).otherwise(0)).alias(c) for c in data_remove_nan.columns])

# Display the counts of "N/A" in each column
na_counts.show()


+--------+--------+------+------------+--------+----------------+----+-----+--------+----+---------------+--------------+------------+----------------+-------------+-------+
|App Name|Category|Rating|Rating Count|Installs|Minimum Installs|Free|Price|Currency|Size|Minimum Android|Content Rating|Ad Supported|In app purchases|Editor Choice|Reviews|
+--------+--------+------+------------+--------+----------------+----+-----+--------+----+---------------+--------------+------------+----------------+-------------+-------+
|       0|       0|     0|           0|       0|               0|   0|    0|       0|   0|              0|             0|           0|               0|            0|      0|
+--------+--------+------+------------+--------+----------------+----+-----+--------+----+---------------+--------------+------------+----------------+-------------+-------+



# **Label Encoding Categorical Columns**

In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

# List of categorical columns to encode
categorical_cols = ["Category", "Free", "Content Rating", "Ad Supported", "In app purchases", "Editor Choice","Currency"]

# Initialize a Pipeline for encoding
stages = []

# Create a dictionary to store the original and new column names
column_name_mapping = {}

# Apply StringIndexer to each categorical column
for col_name in categorical_cols:
    # Define the new column name with "_index" suffix
    new_col_name = f"{col_name}_index"
    column_name_mapping[col_name] = new_col_name

    indexer = StringIndexer(inputCol=col_name, outputCol=new_col_name)
    stages.append(indexer)

# Assemble all stages into a pipeline
pipeline = Pipeline(stages=stages)

# Fit and transform the pipeline on your DataFrame
data_encoded = pipeline.fit(data_remove_nan).transform(data_remove_nan)

# Drop the original columns and keep the updated ones
columns_to_drop = categorical_cols
data_encoded = data_encoded.drop(*columns_to_drop)

# Show the resulting DataFrame with label-encoded columns
data_encoded.show()


+--------------------+---------+------------+--------+----------------+-----+----+---------------+-------+--------------+----------+--------------------+------------------+----------------------+-------------------+--------------+
|            App Name|   Rating|Rating Count|Installs|Minimum Installs|Price|Size|Minimum Android|Reviews|Category_index|Free_index|Content Rating_index|Ad Supported_index|In app purchases_index|Editor Choice_index|Currency_index|
+--------------------+---------+------------+--------+----------------+-----+----+---------------+-------+--------------+----------+--------------------+------------------+----------------------+-------------------+--------------+
|Logistics Management| 4.090909|          66|   10000|           10000|    0| 5.8|              4|     28|           0.0|       0.0|                 0.0|               0.0|                   0.0|                0.0|           0.0|
|    Dental Assistant|3.8666666|          15|   10000|           10000|    0

**Check null value before Imputing Missing Values with Mean**

In [None]:
#null value count each column
for column in data_encoded.columns:
    null_count = data_encoded.filter(col(column).isNull()).count()
    print(f"Column '{column}': {null_count} null values")


Column 'App Name': 0 null values
Column 'Rating': 0 null values
Column 'Rating Count': 0 null values
Column 'Installs': 0 null values
Column 'Minimum Installs': 0 null values
Column 'Price': 0 null values
Column 'Size': 0 null values
Column 'Minimum Android': 0 null values
Column 'Reviews': 0 null values
Column 'Category_index': 0 null values
Column 'Free_index': 0 null values
Column 'Content Rating_index': 0 null values
Column 'Ad Supported_index': 0 null values
Column 'In app purchases_index': 0 null values
Column 'Editor Choice_index': 0 null values
Column 'Currency_index': 0 null values


In [None]:
data_encoded.printSchema()

root
 |-- App Name: string (nullable = true)
 |-- Rating: string (nullable = true)
 |-- Rating Count: string (nullable = true)
 |-- Installs: string (nullable = true)
 |-- Minimum Installs: string (nullable = true)
 |-- Price: string (nullable = true)
 |-- Size: string (nullable = true)
 |-- Minimum Android: string (nullable = true)
 |-- Reviews: string (nullable = true)
 |-- Category_index: double (nullable = false)
 |-- Free_index: double (nullable = false)
 |-- Content Rating_index: double (nullable = false)
 |-- Ad Supported_index: double (nullable = false)
 |-- In app purchases_index: double (nullable = false)
 |-- Editor Choice_index: double (nullable = false)
 |-- Currency_index: double (nullable = false)



**Covert the columns to Double**

In [None]:
from pyspark.sql.functions import col

# List of columns to convert to DoubleType
columns_to_convert = ["Rating", "Rating Count", "Installs", "Minimum Installs", "Price", "Size", "Minimum Android", "Reviews"]

# Convert the columns to DoubleType
for column_name in columns_to_convert:
    data_encoded = data_encoded.withColumn(column_name, col(column_name).cast("double"))

# Show the updated schema of the DataFrame
data_encoded.printSchema()


root
 |-- App Name: string (nullable = true)
 |-- Rating: double (nullable = true)
 |-- Rating Count: double (nullable = true)
 |-- Installs: double (nullable = true)
 |-- Minimum Installs: double (nullable = true)
 |-- Price: double (nullable = true)
 |-- Size: double (nullable = true)
 |-- Minimum Android: double (nullable = true)
 |-- Reviews: double (nullable = true)
 |-- Category_index: double (nullable = false)
 |-- Free_index: double (nullable = false)
 |-- Content Rating_index: double (nullable = false)
 |-- Ad Supported_index: double (nullable = false)
 |-- In app purchases_index: double (nullable = false)
 |-- Editor Choice_index: double (nullable = false)
 |-- Currency_index: double (nullable = false)



**Imputing Missing Values with Mean**

In [None]:
from pyspark.ml.feature import Imputer

# Define a list of columns with missing values that you want to impute
columns_with_null = ["Rating", "Rating Count", "Installs", "Minimum Installs", "Price", "Reviews"]

# Initialize Imputer to replace null values with mean for specified columns
imputer = Imputer(strategy="mean", inputCols=columns_with_null, outputCols=columns_with_null)

# Fit and transform the imputer on the data
data_imputed = imputer.fit(data_encoded).transform(data_encoded)

# The 'data_imputed' DataFrame now contains imputed values for missing data, and column names remain the same.


In [None]:
#null value count each column
for column in data_imputed.columns:
    null_count = data_imputed.filter(col(column).isNull()).count()
    print(f"Column '{column}': {null_count} null values")


Column 'App Name': 0 null values
Column 'Rating': 0 null values
Column 'Rating Count': 0 null values
Column 'Installs': 0 null values
Column 'Minimum Installs': 0 null values
Column 'Price': 0 null values
Column 'Size': 10937 null values
Column 'Minimum Android': 23296 null values
Column 'Reviews': 0 null values
Column 'Category_index': 0 null values
Column 'Free_index': 0 null values
Column 'Content Rating_index': 0 null values
Column 'Ad Supported_index': 0 null values
Column 'In app purchases_index': 0 null values
Column 'Editor Choice_index': 0 null values
Column 'Currency_index': 0 null values


In [None]:
data_imputed.printSchema()

root
 |-- App Name: string (nullable = true)
 |-- Rating: double (nullable = true)
 |-- Rating Count: double (nullable = true)
 |-- Installs: double (nullable = true)
 |-- Minimum Installs: double (nullable = true)
 |-- Price: double (nullable = true)
 |-- Size: double (nullable = true)
 |-- Minimum Android: double (nullable = true)
 |-- Reviews: double (nullable = true)
 |-- Category_index: double (nullable = false)
 |-- Free_index: double (nullable = false)
 |-- Content Rating_index: double (nullable = false)
 |-- Ad Supported_index: double (nullable = false)
 |-- In app purchases_index: double (nullable = false)
 |-- Editor Choice_index: double (nullable = false)
 |-- Currency_index: double (nullable = false)



# **Applying Linear Regression**

**Select Independent and Target Columns**

In [None]:
# Define the independent feature columns
independent_cols = [
    "Rating Count",
    "Installs",
    "Minimum Installs",
    "Price",
    "Size",
    "Minimum Android",
    "Reviews",
    "Category_index",
    "Free_index",
    "Content Rating_index",
    "Ad Supported_index",
    "In app purchases_index",
    "Editor Choice_index",
    "Currency_index"
]

# Define the target column
target_col = "Rating"


In [None]:
data_imputed = data_imputed.dropna(subset=independent_cols + [target_col])


**Create a Vector Assembler**

In [None]:
# Create a Vector Assembler to combine independent columns into a feature vector
feature_assembler = VectorAssembler(inputCols=independent_cols, outputCol="Independent_Features")
output = feature_assembler.transform(data_imputed)


In [None]:
# Select the feature vector and target column
final_output = output.select("Independent_Features", target_col)


**Split the data into training and testing sets**

In [None]:
# Split the data into training and testing sets
train_data, test_data = final_output.randomSplit([0.8, 0.2], seed=12345)


In [None]:
# Initialize the Linear Regression model
regression_algorithm = LinearRegression(featuresCol="Independent_Features", labelCol=target_col)


**Fit the model to the training data**

In [None]:
# Fit the model to the training data
regression_model = regression_algorithm.fit(train_data)


**See the coefficients and intercept**

In [None]:
# Access the model coefficients and intercept
coefficients = regression_model.coefficients
intercept = regression_model.intercept


In [None]:
#show the coefficients and intercept
print("Coefficients: " + str(coefficients))

Coefficients: [1.0019529502596373e-07,-2.0316277328000334e-10,-2.0316277328000334e-10,-0.0006193485231581268,7.041697540319723e-05,0.02851616489910707,-1.2464742473050257e-07,-0.006190436253619568,-0.009618650586208049,-0.01806516191774634,0.0,0.02790753166843399,0.1806734619808791,0.027858980477251456]


**predictions on the test data**

In [None]:
# Make predictions on the test data
predict_result = regression_model.transform(test_data)


**Show the predictions**

In [None]:
# Show the predictions
predict_result.select("Independent_Features", target_col, "prediction").show()


+--------------------+---------+------------------+
|Independent_Features|   Rating|        prediction|
+--------------------+---------+------------------+
|(14,[0,1,2,3,4,5,...|      4.5|  4.15922960340566|
|(14,[0,1,2,3,4,5,...|     3.36| 4.171236221633009|
|(14,[0,1,2,3,4,5,...|      3.7| 4.161988279658477|
|(14,[0,1,2,3,4,5,...|4.3366337| 4.173640820026883|
|(14,[0,1,2,3,4,5,...|4.2954545| 4.172188730156745|
|(14,[0,1,2,3,4,5,...|4.3989525| 4.172761751674257|
|(14,[0,1,2,3,4,5,...|      4.8|4.0542581764049865|
|(14,[0,1,2,4,5],[...|      4.2| 4.182561399552659|
|(14,[0,1,2,4,5,6]...|      4.2| 4.172668619300984|
|(14,[0,1,2,4,5,6]...|      4.2| 4.175583486421332|
|(14,[0,1,2,4,5,6]...|      4.0|  4.19843866852586|
|(14,[0,1,2,4,5,6]...|      4.6| 4.173443081382994|
|(14,[0,1,2,4,5,6]...|      4.4| 4.175837632488977|
|(14,[0,1,2,4,5,6]...|      4.8|  4.14125830054908|
|(14,[0,1,2,4,5,6]...|      4.6| 4.175583448538538|
|(14,[0,1,2,4,5,6]...|      5.0| 4.172767040536329|
|(14,[0,1,2,