In [1]:
import findspark
findspark.init()
findspark.find()

'C:\\spark\\spark'

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Python Basic Examples").getOrCreate()

In [5]:
dataSource = "AppleStore.csv"
df = spark.read.csv(dataSource,inferSchema=True,header=True)
df.printSchema()
df.show(truncate=False)

root
 |-- _c0: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- track_name: string (nullable = true)
 |-- size_bytes: long (nullable = true)
 |-- currency: string (nullable = true)
 |-- price: double (nullable = true)
 |-- rating_count_tot: integer (nullable = true)
 |-- rating_count_ver: integer (nullable = true)
 |-- user_rating: double (nullable = true)
 |-- user_rating_ver: double (nullable = true)
 |-- ver: string (nullable = true)
 |-- cont_rating: string (nullable = true)
 |-- prime_genre: string (nullable = true)
 |-- sup_devices.num: integer (nullable = true)
 |-- ipadSc_urls.num: integer (nullable = true)
 |-- lang.num: integer (nullable = true)
 |-- vpp_lic: integer (nullable = true)

+---+---------+--------------------------------------------------+----------+--------+-----+----------------+----------------+-----------+---------------+-------+-----------+-----------------+---------------+---------------+--------+-------+
|_c0|id       |track_name           

In [6]:
df = df.select(['size_bytes','price','prime_genre','user_rating'])

## CAST Operations

In [7]:
from pyspark.sql.functions import col
from pyspark.sql.types import StringType

df_transformed = df.withColumn("price_new",col("price").cast(StringType()))

## Create UDF

In [8]:
from pyspark.sql.functions import udf
def leftPad(string):
    string = str(string).rjust(10,'0')
    return string
convertUDF = udf(lambda z: leftPad(z),StringType())

## Creating a UDF to change the Values to Upper Case

In [14]:
convertUDF2 = udf(lambda z: str(z).upper(),StringType())

In [15]:
df_transformed = df_transformed. \
    withColumn(
        "price_new", 
        convertUDF(col("price_new")).alias("price_new")
    )
df_transformed = df_transformed. \
    withColumn(
        "prime_genre",
        convertUDF2(col("prime_genre")).alias("prime_genre")
    )

In [16]:
from pyspark.sql.functions import substring, trim
df_transformed = df_transformed.withColumn("price_new",substring('price_new',6,4))
df_transformed = df_transformed. \
    withColumn('prime_genre', trim('prime_genre'))

In [9]:
df_transformed.show()

+----------+-----+-----------------+-----------+---------+
|size_bytes|price|      prime_genre|user_rating|price_new|
+----------+-----+-----------------+-----------+---------+
| 100788224| 3.99|            Games|        4.0|     3.99|
| 158578688|  0.0|     Productivity|        4.0|      0.0|
| 100524032|  0.0|          Weather|        3.5|      0.0|
| 128512000|  0.0|         Shopping|        4.0|      0.0|
|  92774400|  0.0|        Reference|        4.5|      0.0|
|  10485713| 0.99|            Games|        4.0|     0.99|
| 227795968|  0.0|          Finance|        4.0|      0.0|
| 130242560|  0.0|            Music|        4.0|      0.0|
|  49250304| 9.99|        Utilities|        4.5|     9.99|
|  70023168| 3.99|            Games|        4.0|     3.99|
|  49618944| 4.99|            Games|        4.5|     4.99|
| 227547136| 7.99|            Games|        3.5|     7.99|
| 179979264|  0.0|        Utilities|        3.5|      0.0|
| 160925696|  0.0|          Finance|        3.5|      0.

# Analysis and Operations
## Count Distinct

In [10]:
from pyspark.sql.functions import countDistinct
df_distinct_count = df_transformed.select(countDistinct("prime_genre").alias("prime_genre_count"))
df_distinct_count.show()

+-----------------+
|prime_genre_count|
+-----------------+
|               23|
+-----------------+



## Analyze Data using analytical or windowing functions

In [11]:
from pyspark.sql.functions import count

frequencies = df_transformed. \
    groupBy('prime_genre'). \
    agg(count('prime_genre').alias('frequency')). \
    selectExpr(
    '*',
    '100*Frequency / sum(Frequency) over() Percent'
). \
    selectExpr(
    '*',
    'sum(Frequency) over(order by Frequency desc) cumulative_frequency',
    'sum(Percent) over(order by Frequency desc) cumulative_Percent'
)
frequencies.show()

+-----------------+---------+------------------+--------------------+------------------+
|      prime_genre|frequency|           Percent|cumulative_frequency|cumulative_Percent|
+-----------------+---------+------------------+--------------------+------------------+
|            Games|     3862| 53.66124774211477|                3862| 53.66124774211477|
|    Entertainment|      535| 7.433652910935112|                4397|61.094900653049876|
|        Education|      453| 6.294289287203001|                4850| 67.38918994025288|
|    Photo & Video|      349| 4.849242740030569|                5199| 72.23843268028344|
|        Utilities|      248| 3.445880227872725|                5447| 75.68431290815616|
| Health & Fitness|      180| 2.501042100875365|                5627| 78.18535500903153|
|     Productivity|      178| 2.473252744198972|                5805|  80.6586077532305|
|Social Networking|      167|2.3204112824788106|                5972| 82.97901903570931|
|        Lifestyle|  

## Dealing with Null Values

In [22]:
from pyspark.sql.functions import isnan

null_values = df_transformed.filter(
    df_transformed.price.contains('None') | \
    df_transformed.price.contains('NULL') | \
    (col("price") == '') | \
    isnan(df_transformed.price) | \
    df_transformed.price.isNull()
).count()
Not_null_values = df_transformed.count() - null_values
data = [(null_values, Not_null_values)]
df_new = spark.createDataFrame(data, ["Null_value", "Not_null_values"])
df_new.show()

+----------+---------------+
|Null_value|Not_null_values|
+----------+---------------+
|         0|           7197|
+----------+---------------+



## Create Dataframes for unique values and null values

In [24]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

w2 = Window. \
    partitionBy('prime_genre'). \
    orderBy(col("user_rating").desc())
df_duplicate = df_transformed. \
    withColumn("row", row_number().over(w2)). \
    filter(col("row") > 1). \
    drop("row")
df_unique = df_transformed. \
    withColumn("row", row_number().over(w2)). \
    filter(col("row") == 1). \
    drop("row")
df_duplicate.show()
df_unique.show()

+----------+-----+-----------+-----------+---------+
|size_bytes|price|prime_genre|user_rating|price_new|
+----------+-----+-----------+-----------+---------+
|  52120576| 0.99|       BOOK|        5.0|     00.9|
|  39294976| 0.99|       BOOK|        5.0|     00.9|
| 130731008|  0.0|       BOOK|        5.0|     000.|
|  63058944|  0.0|       BOOK|        5.0|     000.|
|  77494272|  0.0|       BOOK|        5.0|     000.|
|  46465024| 3.99|       BOOK|        5.0|     03.9|
| 135236608|  0.0|       BOOK|        5.0|     000.|
|  35903488|  0.0|       BOOK|        5.0|     000.|
|  30749696|  0.0|       BOOK|        5.0|     000.|
| 805970944| 5.99|       BOOK|        5.0|     05.9|
| 798962688| 5.99|       BOOK|        5.0|     05.9|
| 307137536| 2.99|       BOOK|        5.0|     02.9|
|  53123072| 1.99|       BOOK|        5.0|     01.9|
|  81558528|  0.0|       BOOK|        4.5|     000.|
|  36942848| 2.99|       BOOK|        4.5|     02.9|
|  29496320| 1.99|       BOOK|        4.5|    