<a href="https://colab.research.google.com/github/Arun-kc/Assignment/blob/main/assignment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Prerequisite

### Install Pyspark

In [4]:
!pip install pyspark



Connecting with google driver

In [5]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [6]:
from pyspark.sql import SparkSession

In [7]:
spark = SparkSession.builder.appName('spark_app').getOrCreate()

Upload the file `google-play-dataset-by-tapivedotcom.csv` in your Google Drive and give respective path in the `csv_path` variable

In [8]:
csv_path = '/content/drive/MyDrive/Colab Notebooks/SampleData/google-play-dataset-by-tapivedotcom.csv'
df = spark.read.format('csv').option('inferSchema', True).option('header', True).load(csv_path)

### Selecting required columns for the analysis

In [9]:
cols = ("developer", "developerId", "free", "genre", "inAppProductPrice", "minInstalls", "offersIAP", "price", "ratings", "len screenshots", "adSupported", "containsAds", "reviews", "sale", "score", "releasedYear", "releasedMonth", "minprice", "maxprice"
)
df = df.select(*cols)
df.show()

+--------------------+--------------------+----+------------+-----------------+-----------+---------+-----+-------+---------------+-----------+-----------+-------+----+-----+------------+-------------+--------------+--------+
|           developer|         developerId|free|       genre|inAppProductPrice|minInstalls|offersIAP|price|ratings|len screenshots|adSupported|containsAds|reviews|sale|score|releasedYear|releasedMonth|      minprice|maxprice|
+--------------------+--------------------+----+------------+-----------------+-----------+---------+-----+-------+---------------+-----------+-----------+-------+----+-----+------------+-------------+--------------+--------+
| Best theme workshop| 7398289236941441420|   1|Art & Design|             None|    1000000|        0|  0.0|  30650|              4|          1|          1|    152|   0| 4.75|        2014|          Sep|          None|     0.0|
|Best Android Them...| 5762674363677973291|   1|Art & Design|             None|    1000000|     

### Cleaning the data

In [10]:
from pyspark.sql.functions import col, regexp_replace
from pyspark.sql.types import DoubleType, IntegerType

# Assuming 'df' is the DataFrame containing the data

# Clean 'price' column
df = df.withColumn('price', regexp_replace('price', '[^\d.]', ''))
df = df.withColumn('price', col('price').cast(DoubleType()))

# Clean 'minInstalls' column
df = df.withColumn('minInstalls', regexp_replace('minInstalls', '[^\d]', ''))
df = df.withColumn('minInstalls', col('minInstalls').cast(IntegerType()))

# Clean 'releasedYear' column
# df = df.withColumn('releasedYear', regexp_replace('releasedYear', '[^\d]', ''))
df = df.withColumn('releasedYear', col('releasedYear').cast(IntegerType()))

# Clean 'ratings' column
df = df.withColumn('ratings', col('ratings').cast(DoubleType()))

# Clean 'reviews' column
df = df.withColumn('reviews', col('reviews').cast(IntegerType()))

# Clean 'sale' column
df = df.withColumn('sale', regexp_replace('sale', '[^\d.]', ''))
df = df.withColumn('sale', col('sale').cast(DoubleType()))

# Clean 'score' column
df = df.withColumn('score', col('score').cast(DoubleType()))

# Renaming 'len screenshots' column
df = df.withColumnRenamed("len screenshots","len_screenshots")

# Drop rows with null or missing values in the specified columns
columns_to_drop_na = ['price', 'minInstalls', 'releasedYear', 'ratings', 'reviews', 'sale', 'score']
df = df.na.drop(subset=columns_to_drop_na)

df.show()

+--------------------+--------------------+----+------------+-----------------+-----------+---------+-----+-------+---------------+-----------+-----------+-------+----+-----+------------+-------------+--------------+--------+
|           developer|         developerId|free|       genre|inAppProductPrice|minInstalls|offersIAP|price|ratings|len_screenshots|adSupported|containsAds|reviews|sale|score|releasedYear|releasedMonth|      minprice|maxprice|
+--------------------+--------------------+----+------------+-----------------+-----------+---------+-----+-------+---------------+-----------+-----------+-------+----+-----+------------+-------------+--------------+--------+
| Best theme workshop| 7398289236941441420|   1|Art & Design|             None|    1000000|        0|  0.0|30650.0|              4|          1|          1|    152| 0.0| 4.75|        2014|          Sep|          None|     0.0|
|Best Android Them...| 5762674363677973291|   1|Art & Design|             None|    1000000|     

## Binning required columns

### Binning `minInstalls` columns

In [11]:
from pyspark.ml.feature import Bucketizer, StringIndexer
from pyspark.sql.functions import col
from pyspark.sql import functions as F

In [12]:
# Binning logic for 'minInstalls' column
num_bins = [float('-inf'), 100, 1000, 10000, 50000, 100000, 500000, 1000000, 5000000, 10000000, float('inf')]
bin_labels = ["[0-100]", "[100-1000]", "[1000-10000]", "[10000-50000]", "[50000-100000]", "[100000-500000]", "[500000-1000000]", "[1000000-5000000]", "[5000000-10000000]", ["10000000+"]]

# Convert 'minInstalls' column to numeric type
# df = df.withColumn('minInstalls', col('minInstalls').cast('double'))

# Create Bucketizer instance
bucketizer = Bucketizer(splits=num_bins, inputCol='minInstalls', outputCol='installs_bin')

# Apply the bucketizer to the DataFrame
df = bucketizer.transform(df)

# Manually add labels to the resulting DataFrame
label_column = F.when(col("installs_bin") == 0.0, "[0-100]") \
    .when(col("installs_bin") == 1.0, "[100-1000]") \
    .when(col("installs_bin") == 2.0, "[1000-10000]") \
    .when(col("installs_bin") == 3.0, "[10000-50000]") \
    .when(col("installs_bin") == 4.0, "[50000-100000]") \
    .when(col("installs_bin") == 5.0, "[100000-500000]") \
    .when(col("installs_bin") == 6.0, "[500000-1000000]") \
    .when(col("installs_bin") == 7.0, "[1000000-5000000]") \
    .when(col("installs_bin") == 8.0, "[5000000-10000000]") \
    .when(col("installs_bin") == 9.0, "[10000000+]")

df = df.withColumn("installs_bin", label_column)

In [13]:
df.show()

+--------------------+--------------------+----+------------+-----------------+-----------+---------+-----+-------+---------------+-----------+-----------+-------+----+-----+------------+-------------+--------------+--------+-----------------+
|           developer|         developerId|free|       genre|inAppProductPrice|minInstalls|offersIAP|price|ratings|len_screenshots|adSupported|containsAds|reviews|sale|score|releasedYear|releasedMonth|      minprice|maxprice|     installs_bin|
+--------------------+--------------------+----+------------+-----------------+-----------+---------+-----+-------+---------------+-----------+-----------+-------+----+-----+------------+-------------+--------------+--------+-----------------+
| Best theme workshop| 7398289236941441420|   1|Art & Design|             None|    1000000|        0|  0.0|30650.0|              4|          1|          1|    152| 0.0| 4.75|        2014|          Sep|          None|     0.0|[1000000-5000000]|
|Best Android Them...| 5

### Binning `price` columns

In [14]:
# Binning logic for 'price' column
num_bins_price = [float('-inf'), 1, 5, 10, 20, 50, 100, 200, 500, 1000, float('inf')]
bin_labels_price = ["[0-1]", "[1-5]", "[5-10]", "[10-20]", "[20-50]", "[50-100]", "[100-200]", "[200-500]", "[500-1000]", "[1000+]"]

# Convert 'price' column to numeric type
# df = df.withColumn('price', col('price').cast('double'))

# Create Bucketizer instance for 'price' column
bucketizer_price = Bucketizer(splits=num_bins_price, inputCol='price', outputCol='price_bin')

# Apply the bucketizer to the DataFrame
df = bucketizer_price.transform(df)

# Manually add labels to the resulting DataFrame for 'price' column
label_column_price = F.when(col("price_bin") == 0.0, "[0-1]") \
    .when(col("price_bin") == 1.0, "[1-5]") \
    .when(col("price_bin") == 2.0, "[5-10]") \
    .when(col("price_bin") == 3.0, "[10-20]") \
    .when(col("price_bin") == 4.0, "[20-50]") \
    .when(col("price_bin") == 5.0, "[50-100]") \
    .when(col("price_bin") == 6.0, "[100-200]") \
    .when(col("price_bin") == 7.0, "[200-500]") \
    .when(col("price_bin") == 8.0, "[500-1000]") \
    .when(col("price_bin") == 9.0, "[1000+]")

df = df.withColumn("price_bin", label_column_price)

In [15]:
df.show()

+--------------------+--------------------+----+------------+-----------------+-----------+---------+-----+-------+---------------+-----------+-----------+-------+----+-----+------------+-------------+--------------+--------+-----------------+---------+
|           developer|         developerId|free|       genre|inAppProductPrice|minInstalls|offersIAP|price|ratings|len_screenshots|adSupported|containsAds|reviews|sale|score|releasedYear|releasedMonth|      minprice|maxprice|     installs_bin|price_bin|
+--------------------+--------------------+----+------------+-----------------+-----------+---------+-----+-------+---------------+-----------+-----------+-------+----+-----+------------+-------------+--------------+--------+-----------------+---------+
| Best theme workshop| 7398289236941441420|   1|Art & Design|             None|    1000000|        0|  0.0|30650.0|              4|          1|          1|    152| 0.0| 4.75|        2014|          Sep|          None|     0.0|[1000000-5000

### Binning `releasedYear` column

In [16]:
# Binning logic for 'releasedYear' column
num_bins_released_year = [float('-inf'), 2000, 2005, 2010, 2015, 2020, float('inf')]
bin_labels_released_year = ["[<2000]", "[2000-2005]", "[2005-2010]", "[2010-2015]", "[2015-2020]", "[2020+]"]

# Convert 'releasedYear' column to numeric type
# df = df.withColumn('releasedYear', col('releasedYear').cast('double'))

# Create Bucketizer instance for 'releasedYear' column
bucketizer_released_year = Bucketizer(splits=num_bins_released_year, inputCol='releasedYear', outputCol='released_year_bin')

# Apply the bucketizer to the DataFrame
df = bucketizer_released_year.transform(df)

# Manually add labels to the resulting DataFrame for 'releasedYear' column
label_column_released_year = F.when(col("released_year_bin") == 0.0, "[<2000]") \
    .when(col("released_year_bin") == 1.0, "[2000-2005]") \
    .when(col("released_year_bin") == 2.0, "[2005-2010]") \
    .when(col("released_year_bin") == 3.0, "[2010-2015]") \
    .when(col("released_year_bin") == 4.0, "[2015-2020]") \
    .when(col("released_year_bin") == 5.0, "[2020+]")

df = df.withColumn("released_year_bin", label_column_released_year)

In [17]:
df.show()

+--------------------+--------------------+----+------------+-----------------+-----------+---------+-----+-------+---------------+-----------+-----------+-------+----+-----+------------+-------------+--------------+--------+-----------------+---------+-----------------+
|           developer|         developerId|free|       genre|inAppProductPrice|minInstalls|offersIAP|price|ratings|len_screenshots|adSupported|containsAds|reviews|sale|score|releasedYear|releasedMonth|      minprice|maxprice|     installs_bin|price_bin|released_year_bin|
+--------------------+--------------------+----+------------+-----------------+-----------+---------+-----+-------+---------------+-----------+-----------+-------+----+-----+------------+-------------+--------------+--------+-----------------+---------+-----------------+
| Best theme workshop| 7398289236941441420|   1|Art & Design|             None|    1000000|        0|  0.0|30650.0|              4|          1|          1|    152| 0.0| 4.75|        20

### Binning `score` column

In [18]:
# Binning logic for 'score' column
num_bins_score = [float('-inf'), 1, 2, 3, 4, 5]
bin_labels_score = ["[0-1]", "[1-2]", "[2-3]", "[3-4]", "[4-5]"]

# Convert 'score' column to numeric type
df = df.withColumn('reviews', col('score').cast('double'))

# Create Bucketizer instance for 'score' column
bucketizer_score = Bucketizer(splits=num_bins_score, inputCol='score', outputCol='score_bin')

# Apply the bucketizer to the DataFrame
df = bucketizer_score.transform(df)

# Manually add labels to the resulting DataFrame for 'score' column
label_column_score = F.when(col("score_bin") == 0.0, "[0-1]") \
    .when(col("score_bin") == 1.0, "[1-2]") \
    .when(col("score_bin") == 2.0, "[2-3]") \
    .when(col("score_bin") == 3.0, "[3-4]") \
    .when(col("score_bin") == 4.0, "[4-5]")

df = df.withColumn("score_bin", label_column_score)

In [19]:
df.show()

+--------------------+--------------------+----+------------+-----------------+-----------+---------+-----+-------+---------------+-----------+-----------+-------+----+-----+------------+-------------+--------------+--------+-----------------+---------+-----------------+---------+
|           developer|         developerId|free|       genre|inAppProductPrice|minInstalls|offersIAP|price|ratings|len_screenshots|adSupported|containsAds|reviews|sale|score|releasedYear|releasedMonth|      minprice|maxprice|     installs_bin|price_bin|released_year_bin|score_bin|
+--------------------+--------------------+----+------------+-----------------+-----------+---------+-----+-------+---------------+-----------+-----------+-------+----+-----+------------+-------------+--------------+--------+-----------------+---------+-----------------+---------+
| Best theme workshop| 7398289236941441420|   1|Art & Design|             None|    1000000|        0|  0.0|30650.0|              4|          1|          1

# Results


## Combination of price, released_year and installs

Dataframe `df` contains all the necesary data needed for a comprehensive analysis

In [20]:
# Select relevant columns
selected_df = df.select('price_bin', 'released_year_bin', 'installs_bin').groupBy(['price_bin', 'released_year_bin', 'installs_bin']).count()

# Group by the selected columns and get counts
result_df = selected_df.groupBy('price_bin', 'released_year_bin', 'installs_bin').sum('count').withColumnRenamed('sum(count)', 'count')

# Calculate total volume
total_volume = df.count()

# Filter out combinations smaller than 2% of the total volume
result_df = result_df.filter((col('count') / total_volume) >= 0.02)

# Format the output as specified
result_df = result_df.withColumn(
    "output",
    F.concat(
        F.lit("Price="), col("price_bin"), F.lit("; "),
        F.lit("Year="), col("released_year_bin"), F.lit("; "),
        F.lit("Installs="), col("installs_bin"), F.lit("; "),
        col("count")
    )
)

# Select the final output columns
final_output_df = result_df.select("output")

# Write the results to a CSV file
output_file_path = "output/comprehensive_counts.csv"
final_output_df.write.option("header", "true").mode("overwrite").csv(output_file_path)

## Combination of released_year

In [22]:
# Select relevant columns
selected_df = df.select('released_year_bin').groupBy(['released_year_bin']).count()

# Group by the selected columns and get counts
result_df = selected_df.groupBy('released_year_bin').sum('count').withColumnRenamed('sum(count)', 'count')

# Calculate total volume
total_volume = df.count()

# Filter out combinations smaller than 2% of the total volume
result_df = result_df.filter((col('count') / total_volume) >= 0.02)

# Format the output as specified
result_df = result_df.withColumn(
    "output",
    F.concat(
        F.lit("Year="), col("released_year_bin"), F.lit("; "),
        col("count")
    )
)

# Select the final output columns
final_output_df = result_df.select("output")

# Write the results to a CSV file
output_file_path = "output/comprehensive_released_year_counts.csv"
final_output_df.write.option("header", "true").mode("overwrite").csv(output_file_path)