# Set Up

In [26]:
!pip install pyspark



In [27]:
# Import SparkSession from pyspark.sql
from pyspark.sql import SparkSession

# Import functions from pyspark.sql.functions
from pyspark.sql.functions import *

# For using window functions for aggregations or rankings
from pyspark.sql.window import Window

# For using data types when defining schemas or manipulating columns
from pyspark.sql.types import *

# Create a SparkSession, which is the entry point to programming Spark with the Dataset and DataFrame API
spark = SparkSession.builder \
    .appName("Data Cleaning with PySpark") \
    .getOrCreate()

In [28]:
# Load dataset
df = spark.read.csv('kickstarter_data.csv', header=True, inferSchema=True)

# First 5 rows of the dataframe
df.show(5)

+---+----------+--------------------+--------------+-------------+--------+----------+-------+-------------------+-------+--------+-------+-------+-----------+----------------+-------------+
|_c0|        ID|                name|      category|main_category|currency|  deadline|   goal|           launched|pledged|   state|backers|country|usd pledged|usd_pledged_real|usd_goal_real|
+---+----------+--------------------+--------------+-------------+--------+----------+-------+-------------------+-------+--------+-------+-------+-----------+----------------+-------------+
|  0|1000002330|The Songs of Adel...|        Poetry|   Publishing|     GBP|2015-10-09| 1000.0|2015-08-11 12:12:28|    0.0|  failed|      0|     GB|        0.0|             0.0|      1533.95|
|  1|1000003930|Greeting From Ear...|Narrative Film| Film & Video|     USD|2017-11-01|30000.0|2017-09-02 04:43:57| 2421.0|  failed|     15|     US|      100.0|          2421.0|      30000.0|
|  2|1000004038|      Where is Hank?|Narrativ

**ANNOTATION**

Dataframe Columns:
    - `goal`: Goal set at the launched time.

    - `pledge`: Total amount of funding the project successfully called.

    - `backers`: Number of investors that fund the project.

    - `usd pledged`: conversion in US dollars of the pledged column (conversion done by kickstarter).

    - `usd_pledge_real`: conversion in US dollars of the pledged column (conversion from Fixer.io API).

    - `usd_goal_real`: conversion in US dollars of the goal column (conversion from Fixer.io API).

The dataset is acquired from Kaggle.com. You can visit it here: https://www.kaggle.com/kemical/kickstarter-projects

# A. OVERVIEW AND CLEAN

## **A.1** - Remove unwanted observations

### Question 1

We have many columns for the pledge and goal with different conversions.

For this analysis, we choose to keep only `usd_pledged_real` and `usd_goal_real`.

Write one line of code to drop the columns `goal`, `pledged`, `usd pledged`.

In [29]:
df = df.drop('goal', 'pledged', 'usd pledged')

In [30]:
df.show()

+---+----------+--------------------+--------------+-------------+--------+----------+-------------------+----------+-------+-------+----------------+-------------+
|_c0|        ID|                name|      category|main_category|currency|  deadline|           launched|     state|backers|country|usd_pledged_real|usd_goal_real|
+---+----------+--------------------+--------------+-------------+--------+----------+-------------------+----------+-------+-------+----------------+-------------+
|  0|1000002330|The Songs of Adel...|        Poetry|   Publishing|     GBP|2015-10-09|2015-08-11 12:12:28|    failed|      0|     GB|             0.0|      1533.95|
|  1|1000003930|Greeting From Ear...|Narrative Film| Film & Video|     USD|2017-11-01|2017-09-02 04:43:57|    failed|     15|     US|          2421.0|      30000.0|
|  2|1000004038|      Where is Hank?|Narrative Film| Film & Video|     USD|2013-02-26|2013-01-12 00:20:50|    failed|      3|     US|           220.0|      45000.0|
|  3|10000

### Question 2

For future convenience, let's rename the columns as follows:

- `usd_pledged_real` --> `pledged`
- `usd_goal_real` --> `goal`

Write your code to do that below.

In [31]:
df = df.withColumnRenamed('usd_pledged_real', 'pledged')
df.show()

+---+----------+--------------------+--------------+-------------+--------+----------+-------------------+----------+-------+-------+---------+-------------+
|_c0|        ID|                name|      category|main_category|currency|  deadline|           launched|     state|backers|country|  pledged|usd_goal_real|
+---+----------+--------------------+--------------+-------------+--------+----------+-------------------+----------+-------+-------+---------+-------------+
|  0|1000002330|The Songs of Adel...|        Poetry|   Publishing|     GBP|2015-10-09|2015-08-11 12:12:28|    failed|      0|     GB|      0.0|      1533.95|
|  1|1000003930|Greeting From Ear...|Narrative Film| Film & Video|     USD|2017-11-01|2017-09-02 04:43:57|    failed|     15|     US|   2421.0|      30000.0|
|  2|1000004038|      Where is Hank?|Narrative Film| Film & Video|     USD|2013-02-26|2013-01-12 00:20:50|    failed|      3|     US|    220.0|      45000.0|
|  3|1000007540|ToshiCapital Reko...|         Music|

In [32]:
df = df.withColumnRenamed('usd_goal_real', 'goal')
df.show()

+---+----------+--------------------+--------------+-------------+--------+----------+-------------------+----------+-------+-------+---------+--------+
|_c0|        ID|                name|      category|main_category|currency|  deadline|           launched|     state|backers|country|  pledged|    goal|
+---+----------+--------------------+--------------+-------------+--------+----------+-------------------+----------+-------+-------+---------+--------+
|  0|1000002330|The Songs of Adel...|        Poetry|   Publishing|     GBP|2015-10-09|2015-08-11 12:12:28|    failed|      0|     GB|      0.0| 1533.95|
|  1|1000003930|Greeting From Ear...|Narrative Film| Film & Video|     USD|2017-11-01|2017-09-02 04:43:57|    failed|     15|     US|   2421.0| 30000.0|
|  2|1000004038|      Where is Hank?|Narrative Film| Film & Video|     USD|2013-02-26|2013-01-12 00:20:50|    failed|      3|     US|    220.0| 45000.0|
|  3|1000007540|ToshiCapital Reko...|         Music|        Music|     USD|2012-04

## **A.2** - Structural Error, Correct Datatype
---

### Question 3

Write one line of code to print the overall information of the dataset. Are there any columns that you feel like they have the wrong datatype?

In [33]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- ID: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- main_category: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- deadline: string (nullable = true)
 |-- launched: string (nullable = true)
 |-- state: string (nullable = true)
 |-- backers: string (nullable = true)
 |-- country: string (nullable = true)
 |-- pledged: string (nullable = true)
 |-- goal: string (nullable = true)



In [34]:
# Du lieu bi sai dinh dang o cot ()

### Question 4

Convert the `launched` and `deadline` to `datetime` datatype.

In [35]:
df = df.withColumn("launched", to_date(df["launched"]))
df = df.withColumn("deadline", to_date(df["deadline"]))
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- ID: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- main_category: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- deadline: date (nullable = true)
 |-- launched: date (nullable = true)
 |-- state: string (nullable = true)
 |-- backers: string (nullable = true)
 |-- country: string (nullable = true)
 |-- pledged: string (nullable = true)
 |-- goal: string (nullable = true)



Check info one more time to make sure everything goes as plan.

In [36]:
df = df.withColumn("backers", df["backers"].cast(IntegerType()))
df = df.withColumn("pledged", df["pledged"].cast(IntegerType()))
df = df.withColumn("goal", df["goal"].cast(IntegerType()))
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- ID: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- main_category: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- deadline: date (nullable = true)
 |-- launched: date (nullable = true)
 |-- state: string (nullable = true)
 |-- backers: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- pledged: integer (nullable = true)
 |-- goal: integer (nullable = true)



## **A.3** - Handling Missing Values
---

### Question 5

Give the number of null values in *each* column.

In [37]:
# Count the number of null values in each column
null_counts = df.agg(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns))

# Show the results
null_counts.show()

+---+---+----+--------+-------------+--------+--------+--------+-----+-------+-------+-------+----+
|_c0| ID|name|category|main_category|currency|deadline|launched|state|backers|country|pledged|goal|
+---+---+----+--------+-------------+--------+--------+--------+-----+-------+-------+-------+----+
|  0|  0|   4|       0|            3|       4|    1297|    1114|    5|   1111|      5|    198|  24|
+---+---+----+--------+-------------+--------+--------+--------+-----+-------+-------+-------+----+



### Question 6

Write one line of code to fill all the `NaN` values in name with `Unknown`.

In [38]:
df = df.na.fill('Unknown', subset=['name'])

In [39]:
df.show()

+---+----------+--------------------+--------------+-------------+--------+----------+----------+----------+-------+-------+-------+------+
|_c0|        ID|                name|      category|main_category|currency|  deadline|  launched|     state|backers|country|pledged|  goal|
+---+----------+--------------------+--------------+-------------+--------+----------+----------+----------+-------+-------+-------+------+
|  0|1000002330|The Songs of Adel...|        Poetry|   Publishing|     GBP|2015-10-09|2015-08-11|    failed|      0|     GB|      0|  1533|
|  1|1000003930|Greeting From Ear...|Narrative Film| Film & Video|     USD|2017-11-01|2017-09-02|    failed|     15|     US|   2421| 30000|
|  2|1000004038|      Where is Hank?|Narrative Film| Film & Video|     USD|2013-02-26|2013-01-12|    failed|      3|     US|    220| 45000|
|  3|1000007540|ToshiCapital Reko...|         Music|        Music|     USD|2012-04-16|2012-03-17|    failed|      1|     US|      1|  5000|
|  4|1000011046|Comm

## **A.4** - Handling errors, corrupted data
---

Scanning through each column to find abnormalities and fix them. Simple as that.

### Question 7

Let's start with `category`. Write an expression to display the frequency of the value in the column `category`. (The unique values and how many times they appear)

In [40]:
df.groupBy("category").agg(count("*").alias("frequency")).show()

+--------------------+---------+
|            category|frequency|
+--------------------+---------+
|         we create!"|        1|
|   I'm Your Cream"""|        1|
| a sequel to the ...|        1|
| Meadower and Car...|        1|
|     "" said Sydney"|        1|
|  "" World Premiere"|        1|
|  Italy by the Yard"|        1|
| the parody music...|        1|
|                   7|        1|
|             Alcohol|        1|
| an M.F.A. featur...|        1|
|         So Sorry"""|        1|
| Though I Walk"" ...|        1|
| court-métrage fa...|        1|
| Final  Recording...|        1|
| Lust & Straight ...|        1|
| a one man show a...|        1|
| Iran and Turkey/...|        1|
|  there's your quote|        1|
| Promotion & Dist...|        1|
+--------------------+---------+
only showing top 20 rows



### Question 8

Do the same to check abnormalities in the column `country`.

In [41]:
df.groupBy("country").agg(count("*").alias("frequency")).show()

+-------+---------+
|country|frequency|
+-------+---------+
|      7|       24|
|     51|        6|
|     54|        5|
|     15|       13|
|    155|        1|
|     NL|     2865|
|     11|       17|
|    101|        2|
|     69|        4|
|     29|       10|
| 3168.0|        1|
|     42|        7|
|     73|        2|
|     87|        1|
|    468|        1|
|     MX|     1745|
|     64|        5|
|      3|       56|
|     30|        6|
|     34|        7|
+-------+---------+
only showing top 20 rows



### Question 9

Write an expression to select all rows with that weird value above (`N,0"`).

In [48]:
filtered_df = df.filter(df.currency == 'N,0"')
filtered_df.show()

+--------+
|currency|
+--------+
+--------+



### Question 10

Write one line of code to return the ***unique currencies*** of the projects that have country as `N,0"`?

In [57]:
unique_currencies = df.filter(df.currency == 'N,0"').select('currency').distinct().rdd.map(lambda x: x[0]).collect()

### Question 11

Our mission is apply a check function onto each row of the country-N0" part.

First, define a function that takes in a whole data row.

- If currency is `USD` ---> country is `US`
- If currency is `AUD` ---> country is `AU`
- If currency is `CAD` ---> country is `CA`
- If currency is `GBP` ---> country is `GB`
- If currency is `SEK` ---> country is `SE`
- If currency is `DKK` ---> country is `DK`
- If currency is `NZD` ---> country is `NZ`
- If currency is `NOK` ---> country is `NO`
- If currency is `CHF` ---> country is `CH`
- If currency is `EUR` ---> country is `DE`

In the `EUR` case, we choose to replace by the mode --- `DE` (Within projects that in `EUR`, the most are from `DE` -- Germany)

In [44]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Currency to Country Check") \
    .getOrCreate()

data = [("USD",), ("AUD",), ("CAD",), ("GBP",), ("SEK",), ("DKK",), ("NZD",), ("NOK",), ("CHF",), ("EUR",)]
columns = ["currency"]

df = spark.createDataFrame(data, columns)

df_with_country = df.withColumn("country",
    when(df["currency"] == "USD", "US")
    .when(df["currency"] == "AUD", "AU")
    .when(df["currency"] == "CAD", "CA")
    .when(df["currency"] == "GBP", "GB")
    .when(df["currency"] == "SEK", "SE")
    .when(df["currency"] == "DKK", "DK")
    .when(df["currency"] == "NZD", "NZ")
    .when(df["currency"] == "NOK", "NO")
    .when(df["currency"] == "CHF", "CH")
    .otherwise("DE")
)

df_with_country.show()


+--------+-------+
|currency|country|
+--------+-------+
|     USD|     US|
|     AUD|     AU|
|     CAD|     CA|
|     GBP|     GB|
|     SEK|     SE|
|     DKK|     DK|
|     NZD|     NZ|
|     NOK|     NO|
|     CHF|     CH|
|     EUR|     DE|
+--------+-------+



### Question 12

Save result using .write.parquet()

In [45]:
df_with_country.write.parquet("output.parquet")