<a href="https://colab.research.google.com/github/ShachiniMekala/Google_PlayStore_Analysis/blob/main/pySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Background Work

In [None]:
!apt install python3-wget

Reading package lists... Done
Building dependency tree       
Reading state information... Done
The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'apt autoremove' to remove it.
The following additional packages will be installed:
  python-asn1crypto python-cffi-backend python-cryptography python-enum34
  python-idna python-ipaddress python-openssl python-six python-urllib3
Suggested packages:
  python-cryptography-doc python-cryptography-vectors python-enum34-doc
  python-openssl-doc python-openssl-dbg python-ntlm python-socks
The following NEW packages will be installed:
  python-asn1crypto python-cffi-backend python-cryptography python-enum34
  python-idna python-ipaddress python-openssl python-six python-urllib3
  python3-wget
0 upgraded, 10 newly installed, 0 to remove and 34 not upgraded.
Need to get 647 kB of archives.
After this operation, 3,808 kB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu 

In [None]:
import wget
def bar_custom(current, total, width=80):
    print_str=("\r\rDownloading: %d%% [%dM / %dM] bytes" % (current / total * 100, current/(1024*1024), total/(1024*1024)))
    print (len(print_str)*'\b',print_str, end ="")
#Now use this like below,
url = 'https://storage.googleapis.com/kaggle-data-sets/157336/1712743/compressed/Google-Playstore.csv.zip?X-Goog-Algorithm=GOOG4-RSA-SHA256&X-Goog-Credential=gcp-kaggle-com%40kaggle-161607.iam.gserviceaccount.com%2F20210516%2Fauto%2Fstorage%2Fgoog4_request&X-Goog-Date=20210516T172807Z&X-Goog-Expires=259199&X-Goog-SignedHeaders=host&X-Goog-Signature=4e56e6e297627b969ceec99c8f39b0b3d2aaeb16b0944f073656904bbdaf826d29c896fdc82537549d1bf91a524370a2d078b4cda34c69afa755f6966e1b46112c5aa0c7172d6691836f0b26688713b223c81d0026fe04a923037d07499aeeb8d407790b95c92b8050603a30b0677ba0754ab5dbcb9d2b06b119f007d0433558170507e5013438fe8081927b1efbd8a073ef31e81de71276570a7869717fc7b3dbc73042846458ecf2e9d94c5fa5b26fbf71bf11880ffb61192df3747146d20ca800cf06386a0c563a3b96243e047a5aac17b12cf2709b712027ed31d8424057f0063bc3bb3c60f0de710a3bd78a13d8f66d8776de333255f25d58d496d26d69'
save_path = "/content/"
wget.download(url, save_path, bar=bar_custom)

print('\nfinished...!')

In [None]:
!unzip "/content/Google-Playstore.csv.zip"

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
!wget -q https://downloads.apache.org/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz

In [None]:
!tar xf spark-3.1.1-bin-hadoop2.7.tgz

In [None]:
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()

In [None]:
findspark.find()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, when, count, col, regexp_replace, lit, round, length, trim, concat, to_date
from pyspark.sql.types import StringType, BooleanType, IntegerType, FloatType, DecimalType, DateType, DoubleType, LongType
from pyspark.ml.feature import Imputer

In [None]:
spark = SparkSession.builder.appName("Data Preprocessing").getOrCreate()

In [None]:
dataset = spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true', quote='"',
                                                                delimiter=',').load('/content/Google-Playstore.csv')

# Data Preprocessing

# Drop duplicate values

In [None]:
print('\nOriginal count: ', dataset.count())
dataset = dataset.dropDuplicates()
print('\nAfter removing duplicate values: ', dataset.count())

# Drop unwanted columns

In [None]:
dataset = dataset.drop('Installs', 'Minimum Installs', 'Price', 'Currency', 'Developer Website',
                       'Developer Email', 'Privacy Policy', 'Editors Choice')
dataset.show()
print('Unwanted columns dropped')
print(dataset.count())

# Drop Null values

In [None]:
dataset = dataset.na.drop(how='any', subset=['Category', 'Developer Id', 'Ad Supported', 'In App Purchases'])
dataset.show()
print('Null values dropped')
print(dataset.count())

# Get null count

In [None]:
dataset.select([count(when(col(c).isNull(), c)).alias(c) for c in dataset.columns]).show()

# Data Cleaning for "Free" column

In [None]:
dataset = dataset.filter(
    dataset['Free'].contains('True') |
    dataset['Free'].contains('False')
)
dataset.show()
print(dataset.count())

# Data cleaning for "Category" column

In [None]:
dataset = dataset.filter(
    dataset.Category.contains('Communication') |
    dataset.Category.contains('Strategy') |
    dataset.Category.contains('Tools') |
    dataset.Category.contains('Music & Audio') |
    dataset.Category.contains('Maps & Navigation') |
    dataset.Category.contains('Lifestyle') |
    dataset.Category.contains('Educational') |
    dataset.Category.contains('Education') |
    dataset.Category.contains('Productivity') |
    dataset.Category.contains('Business') |
    dataset.Category.contains('Board') |
    dataset.Category.contains('Sports') |
    dataset.Category.contains('Medical') |
    dataset.Category.contains('Finance') |
    dataset.Category.contains('Parenting') |
    dataset.Category.contains('Puzzle') |
    dataset.Category.contains('Casual') |
    dataset.Category.contains('Events') |
    dataset.Category.contains('Music') |
    dataset.Category.contains('Trivia') |
    dataset.Category.contains('Arcade') |
    dataset.Category.contains('Personalization') |
    dataset.Category.contains('Entertainment') |
    dataset.Category.contains('Action') |
    dataset.Category.contains('Travel & Local') |
    dataset.Category.contains('Auto & Vehicles') |
    dataset.Category.contains('Health & Fitness') |
    dataset.Category.contains('House & Home') |
    dataset.Category.contains('News & Magazines') |
    dataset.Category.contains('Food & Drink') |
    dataset.Category.contains('Books & Reference') |
    dataset.Category.contains('Shopping') |
    dataset.Category.contains('Simulation') |
    dataset.Category.contains('Racing') |
    dataset.Category.contains('Weather') |
    dataset.Category.contains('Adventure') |
    dataset.Category.contains('Social') |
    dataset.Category.contains('Word') |
    dataset.Category.contains('Comics') |
    dataset.Category.contains('Card') |
    dataset.Category.contains('Casino') |
    dataset.Category.contains('Beauty') |
    dataset.Category.contains('Dating') |
    dataset.Category.contains('Libraries & Demo') |
    dataset.Category.contains('Video Players & Editors') |
    dataset.Category.contains('Art & Design') |
    dataset.Category.contains('Role Playing') |
    dataset.Category.contains('Photography')
)
dataset.show()
print(dataset.count())

# Data cleaning for "Content Rating" column

In [None]:
dataset = dataset.filter(
    dataset['Content Rating'].contains('Everyone') |
    dataset['Content Rating'].contains('Teen') |
    dataset['Content Rating'].contains('Adults only 18+') |
    dataset['Content Rating'].contains('Mature 17+') |
    dataset['Content Rating'].contains('Everyone 10+') 
)
dataset.show()
print(dataset.count())

# Data cleaning for "Size" column

In [None]:
dataset = dataset.filter(
    dataset.Size.contains('M') |
    dataset.Size.contains('G') |
    dataset.Size.contains('k') |
    dataset.Size.contains('Varies with device') 
)
dataset = dataset.withColumn('Size', regexp_replace(col('Size'), r'(M)', ''))
dataset = dataset.withColumn('Size',
                             when(
                                 dataset.Size.contains('G'),
                                 round(regexp_replace(col('Size'), r'(G)', '').cast('float') * 1024, 2)
                             ).
                             otherwise(col('Size')))
dataset = dataset.withColumn('Size',
                             when(
                                 dataset.Size.contains('k'),
                                 round(regexp_replace(col('Size'), r'(k)', '').cast('float') / 1024, 2)
                             ).
                             otherwise(col('Size')))
dataset = dataset.withColumn('Size',
                             when(
                                 dataset.Size.contains('Varies'),
                                 lit(None)
                                 # np.nan
                             ).
                             otherwise(col('Size')))
dataset = dataset.withColumn('Size', col("Size").cast(FloatType()))
dataset = Imputer(
    inputCol='Size',
    outputCol='Size'
).setStrategy("mean").fit(dataset).transform(dataset).withColumn('Size', round(col('Size'), 2))
dataset.show()
dd = dataset

# Data cleaning for "Released" column

In [None]:
dataset = dd.withColumn('Released', when(col('Released').isNull(), col('Last Updated')).otherwise(col('Released')))
dataset = dataset.filter(
    dataset['Released'].contains('Feb') |
    dataset['Released'].contains('Mar') |
    dataset['Released'].contains('Apr') |
    dataset['Released'].contains('Jan') |
    dataset['Released'].contains('May') |
    dataset['Released'].contains('Jun') |
    dataset['Released'].contains('Jul') |
    dataset['Released'].contains('Aug') |
    dataset['Released'].contains('Sep') |
    dataset['Released'].contains('Oct') |
    dataset['Released'].contains('Nov') |
    dataset['Released'].contains('Dec')
)
dataset = dataset.withColumn('month', trim(col('Released')).substr(1,3))
dataset = dataset.withColumn('date', trim(col('Released')).substr(5,2))
dataset = dataset.withColumn('year', trim(col('Released')).substr(9,4))
dataset = dataset.filter((dataset.date >= 1) & (dataset.date <= 31))
dataset = dataset.filter((dataset.year >= 2000) & (dataset.year <= 2021))
dataset = dataset.filter(dataset.month.rlike('(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)'))
months = ['Jan','Feb','Mar','Apr','May','Jun','Jul','Aug','Sep','Oct','Nov','Dec']
for m in months:
  dataset = dataset.withColumn('month',
                               when(
                                   dataset['Released'].contains(m),
                                    lit(months.index(m)+1)
                                    ).otherwise(col('month')))                               
dataset = dataset.withColumn('Released',concat(dataset.year,lit('-'),dataset.month,lit('-'),dataset.date))
dataset = dataset.withColumn('Released', col('Released').cast(DateType()))
dataset = dataset.drop('month','year','date')
dataset.show()
print(dataset.count())

# Data cleaning for "Last Updated" column

In [None]:
dataset = dataset.filter(
    dataset['Last Updated'].contains('Jan') |
    dataset['Last Updated'].contains('Feb') |
    dataset['Last Updated'].contains('Mar') |
    dataset['Last Updated'].contains('Apr') |
    dataset['Last Updated'].contains('May') |
    dataset['Last Updated'].contains('Jun') |
    dataset['Last Updated'].contains('Jul') |
    dataset['Last Updated'].contains('Aug') |
    dataset['Last Updated'].contains('Sep') |
    dataset['Last Updated'].contains('Oct') |
    dataset['Last Updated'].contains('Nov') |
    dataset['Last Updated'].contains('Dec')
)
dataset = dataset.withColumn('month', trim(col('Last Updated')).substr(1,3))
dataset = dataset.withColumn('date', trim(col('Last Updated')).substr(5,2))
dataset = dataset.withColumn('year', trim(col('Last Updated')).substr(9,4))
dataset = dataset.filter((dataset.date >= 1) & (dataset.date <= 31))
dataset = dataset.filter((dataset.year >= 2000) & (dataset.year <= 2021))
dataset = dataset.filter(dataset.month.rlike('(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)'))
months = ['Jan','Feb','Mar','Apr','May','Jun','Jul','Aug','Sep','Oct','Nov','Dec']
for m in months:
  dataset = dataset.withColumn('month',
                               when(
                                   dataset['Last Updated'].contains(m),
                                    lit(months.index(m)+1)
                                    ).otherwise(col('month')))                   
dataset = dataset.withColumn('Last Updated',concat(dataset.year,lit('-'),dataset.month,lit('-'),dataset.date))
dataset = dataset.withColumn('Last Updated', col('Last Updated').cast(DateType()))
dataset = dataset.drop('month','year','date')
dataset.show()

# Data cleaning for 'Minimum Android' column

In [None]:
dataset = dataset.withColumn('Minimum Android',
                             when(
                                 dataset['Minimum Android'].contains('Varies'),
                                 lit(None)
                             ).
                             otherwise(col('Minimum Android')))
dataset = dataset.withColumn('Minimum Android',
                             when(
                                 dataset['Minimum Android'].contains('and up'),
                                 trim(regexp_replace(col('Minimum Android'), r'(and up)', ''))
                             ).
                             otherwise(col('Minimum Android')))
dataset = dataset.withColumn('Minimum Android',
                             when(
                                 length(trim(dataset['Minimum Android']))>3,
                                 trim(dataset['Minimum Android'].substr(1,3))
                             ).
                             otherwise(col('Minimum Android')))
dataset = dataset.withColumn('Minimum Android',col('Minimum Android').cast(FloatType()))

dataset = Imputer(
    inputCol='Minimum Android',
    outputCol='Minimum Android'
).setStrategy("mode").fit(dataset).transform(dataset)
dataset = dataset.filter((dataset['Minimum Android'] > 1.0) & (dataset['Minimum Android'] < 10.0))
dataset.show()

# Cast data types

In [None]:
dataset = dataset.withColumn('Ad Supported', col('Ad Supported').cast(BooleanType())) \
    .withColumn('In App Purchases', col('In App Purchases').cast(BooleanType())) \
    .withColumn('Maximum Installs', col('Maximum Installs').cast(LongType())) \
    .withColumn('Rating', col('Rating').cast(FloatType())) \
    .withColumn('Rating Count', col('Rating Count').cast(IntegerType()))


# Drop Rating null values

In [None]:
#dataset = dataset.filter(dataset.Rating.isNotNull())
#dataset.show()
#print(dataset.count())

# Get null count

In [None]:
dataset.select([count(when(col(c).isNull(), c)).alias(c) for c in dataset.columns]).show()

In [None]:
dataset.show()

# Data cleaning for "Rating" and "Rating Count" columns 

In [None]:
import pandas as pd

#converts pyspark dataframe into pandas
df = dataset.toPandas()

In [None]:
import numpy as np

#Splitting data into equi width bins according to the values in 'Maximum Installs' column
labels = ['Very Low','Low', 'Average', 'High', 'Very High']

min_value = df['Maximum Installs'].min()
max_value = df['Maximum Installs'].max()

bins = np.linspace(min_value,max_value,6)

#insert a new column according to the bin
df['Install State'] = pd.cut(df['Maximum Installs'], bins=bins, labels=labels, include_lowest=True)

display(df)

In [None]:
#calculating mean for each bin
result_rating_mean = df.groupby('Install State').agg({'Rating': ['mean']})
result_rating_mean.columns = result_rating_mean.columns.droplevel(0)

#assign mean in each bin into variables
rating_very_low=result_rating_mean['mean'].values[0]
rating_low=result_rating_mean['mean'].values[1]
rating_average=result_rating_mean['mean'].values[2]
rating_high=result_rating_mean['mean'].values[3]
rating_very_high=result_rating_mean['mean'].values[4]

In [None]:
#dataset.describe(['Maximum Installs']).show()

In [None]:
#calculating mean for each bin
result_rating_count_mean=df.groupby('Install State').agg({'Rating Count': ['mean']})
result_rating_count_mean.columns = result_rating_count_mean.columns.droplevel(0)

#assign mean in each bin into variables
rcount_very_low=result_rating_count_mean['mean'].values[0]
rcount_low=result_rating_count_mean['mean'].values[1]
rcount_average=result_rating_count_mean['mean'].values[2]
rcount_high=result_rating_count_mean['mean'].values[3]
rcount_very_high=result_rating_count_mean['mean'].values[4]

In [None]:
#take null values count before replacing
nan_rating=df['Rating'].isna().sum() #null count
print("Before Replacing - 'Rating' Null Values Count ")
print(nan_rating)

nan_rating_count=df['Rating Count'].isna().sum() #null count
print("Before Replacing - 'Rating Count' Null Values Count ")
print(nan_rating_count)

In [None]:
#for Ratings
df['Rating'] = df.apply(lambda row: rating_very_low if pd.isnull(row['Rating']) and row['Install State']=='Very Low' \
                        else rating_low if pd.isnull(row['Rating']) and row['Install State']=='Low' \
                        else rating_average if pd.isnull(row['Rating']) and row['Install State']=='Average' \
                        else rating_high if pd.isnull(row['Rating']) and row['Install State']=='High' \
                        else rating_very_high if pd.isnull(row['Rating']) and row['Install State']=='Very High' \
                        else row['Rating'], axis=1)

#for rating count
df['Rating Count'] = df.apply(lambda row: rcount_very_low if pd.isnull(row['Rating Count']) and row['Install State']=='Very Low' \
                              else rcount_low if pd.isnull(row['Rating Count']) and row['Install State']=='Low' \
                              else rcount_average if pd.isnull(row['Rating Count']) and row['Install State']=='Average' \
                              else rcount_high if pd.isnull(row['Rating Count']) and row['Install State']=='High' \
                              else rcount_very_high if pd.isnull(row['Rating Count']) and row['Install State']=='Very High' \
                              else row['Rating Count'], axis=1)

#rounding values
df['Rating']=np.round(df['Rating'], decimals=1)

In [None]:
#take null values count after replacing
nan_rating=df['Rating'].isna().sum() #null count
print("After Replacing - 'Rating' Null Values Count ")
print(nan_rating)

nan_rating_count=df['Rating Count'].isna().sum() #null count
print("After Replacing - 'Rating Count' Null Values Count ")
print(nan_rating_count)

display(df)

# Correlation

**Negative correlation :**
The y values tend to decrease as the x values increase. This shows strong negative correlation, which occurs when large values of one feature correspond to small values of the other, and vice versa.

**Weak or no correlation** **:** 
Occurs when an association between two features is not obvious or is hardly observable.

**Positive correlation** **:** 
Strong positive correlation, which occurs when large values of one feature correspond to large values of the other, and vice versa.

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import scipy.stats

In [None]:
#correlation matrix
df_numerical=df[['Rating','Rating Count','Maximum Installs','Size','Minimum Android']]
df_numerical.corr()

In [None]:
x=np.array(df['Rating Count'].tolist())
y=np.array(df['Maximum Installs'].tolist())

# x=np.array(df['Rating'].tolist())
# y=np.array(df['Maximum Installs'].tolist())

# x=np.array(df['Rating'].tolist())
# y=np.array(df['Size'].tolist())

r = np.corrcoef(x, y)

slope, intercept, r, p, stderr = scipy.stats.linregress(x, y)

line = f'Regression line: y={intercept:.2f}+{slope:.2f}x, r={r:.2f}'

import matplotlib.pyplot as plt
plt.style.use('ggplot')
fig, ax = plt.subplots()
ax.plot(x, y, linewidth=0, marker='s', label='Data points')
ax.plot(x, intercept + slope * x, label=line)
ax.set_xlabel('Rating Count')
ax.set_ylabel('Installs')
ax.legend(facecolor='white')
plt.show()

In [None]:
######################################### Do not Run ###################################################333
#rounding off values
df['Rating']=df['Rating'].round(decimals=1)
# df['Rating']=df['Rating Count'].round(decimals=1)


# Finalizing the cleaned dataset

In [None]:
#count the null count in each column
null_count=df.isnull().sum()
null_count

In [None]:
#Export the Cleanes Datased into a .CSV
df.to_csv(r'/content/Google-Playstore-Cleaned.csv')

# Data Analytics

# Identify the most rated category in Google play store

In [None]:
category_wise_mean=df.groupby('Category').agg({'Rating': ['mean']})
category_wise_mean=category_wise_mean.columns.droplevel(0)

category_wise_mean_df = pd.DataFrame(category_wise_mean)
category_wise_mean_df
#print(category_wise_mean[category_wise_mean['mean'] == category_wise_mean['mean'].max()])

Unnamed: 0,0
0,mean
