# Set Up

In [1]:
#pip install pyspark

In [2]:
import pyspark

In [3]:
# Import SparkSession from pyspark.sql
from pyspark.sql import SparkSession

# Import functions from pyspark.sql.functions
from pyspark.sql.functions import *

# For using window functions for aggregations or rankings
from pyspark.sql.window import Window

# For using data types when defining schemas or manipulating columns
from pyspark.sql.types import *

# Create a SparkSession, which is the entry point to programming Spark with the Dataset and DataFrame API
spark = SparkSession.builder \
    .appName("Data Cleaning with PySpark") \
    .getOrCreate()

24/04/15 16:01:12 WARN Utils: Your hostname, codespaces-f38966 resolves to a loopback address: 127.0.0.1; using 172.16.5.4 instead (on interface eth0)
24/04/15 16:01:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/15 16:01:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# Load dataset
df = spark.read.csv('kickstarter_data.csv', header=True, inferSchema=True)

# First 5 rows of the dataframe
df.show(5)

                                                                                

+---+----------+--------------------+--------------+-------------+--------+----------+-------+-------------------+-------+--------+-------+-------+-----------+----------------+-------------+
|_c0|        ID|                name|      category|main_category|currency|  deadline|   goal|           launched|pledged|   state|backers|country|usd pledged|usd_pledged_real|usd_goal_real|
+---+----------+--------------------+--------------+-------------+--------+----------+-------+-------------------+-------+--------+-------+-------+-----------+----------------+-------------+
|  0|1000002330|The Songs of Adel...|        Poetry|   Publishing|     GBP|2015-10-09| 1000.0|2015-08-11 12:12:28|    0.0|  failed|      0|     GB|        0.0|             0.0|      1533.95|
|  1|1000003930|Greeting From Ear...|Narrative Film| Film & Video|     USD|2017-11-01|30000.0|2017-09-02 04:43:57| 2421.0|  failed|     15|     US|      100.0|          2421.0|      30000.0|
|  2|1000004038|      Where is Hank?|Narrativ

24/04/15 16:01:25 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , ID, name, category, main_category, currency, deadline, goal, launched, pledged, state, backers, country, usd pledged, usd_pledged_real, usd_goal_real
 Schema: _c0, ID, name, category, main_category, currency, deadline, goal, launched, pledged, state, backers, country, usd pledged, usd_pledged_real, usd_goal_real
Expected: _c0 but found: 
CSV file: file:///workspaces/BIGDATA/WEEK3/kickstarter_data.csv


**ANNOTATION**

Dataframe Columns:
    - `goal`: Goal set at the launched time.

    - `pledge`: Total amount of funding the project successfully called.

    - `backers`: Number of investors that fund the project.

    - `usd pledged`: conversion in US dollars of the pledged column (conversion done by kickstarter).

    - `usd_pledge_real`: conversion in US dollars of the pledged column (conversion from Fixer.io API).

    - `usd_goal_real`: conversion in US dollars of the goal column (conversion from Fixer.io API).

The dataset is acquired from Kaggle.com. You can visit it here: https://www.kaggle.com/kemical/kickstarter-projects

# A. OVERVIEW AND CLEAN

## **A.1** - Remove unwanted observations

### Question 1

We have many columns for the pledge and goal with different conversions.

For this analysis, we choose to keep only `usd_pledged_real` and `usd_goal_real`.

Write one line of code to drop the columns `goal`, `pledged`, `usd pledged`.

In [5]:
# YOUR CODE HERE
df = df.drop("goal", "pledged", "usd pledged")
df.show(5)

+---+----------+--------------------+--------------+-------------+--------+----------+-------------------+--------+-------+-------+----------------+-------------+
|_c0|        ID|                name|      category|main_category|currency|  deadline|           launched|   state|backers|country|usd_pledged_real|usd_goal_real|
+---+----------+--------------------+--------------+-------------+--------+----------+-------------------+--------+-------+-------+----------------+-------------+
|  0|1000002330|The Songs of Adel...|        Poetry|   Publishing|     GBP|2015-10-09|2015-08-11 12:12:28|  failed|      0|     GB|             0.0|      1533.95|
|  1|1000003930|Greeting From Ear...|Narrative Film| Film & Video|     USD|2017-11-01|2017-09-02 04:43:57|  failed|     15|     US|          2421.0|      30000.0|
|  2|1000004038|      Where is Hank?|Narrative Film| Film & Video|     USD|2013-02-26|2013-01-12 00:20:50|  failed|      3|     US|           220.0|      45000.0|
|  3|1000007540|ToshiC

24/04/15 16:01:25 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , ID, name, category, main_category, currency, deadline, launched, state, backers, country, usd_pledged_real, usd_goal_real
 Schema: _c0, ID, name, category, main_category, currency, deadline, launched, state, backers, country, usd_pledged_real, usd_goal_real
Expected: _c0 but found: 
CSV file: file:///workspaces/BIGDATA/WEEK3/kickstarter_data.csv


### Question 2

For future convenience, let's rename the columns as follows:

- `usd_pledged_real` --> `pledged`
- `usd_goal_real` --> `goal`

Write your code to do that below.

In [6]:
# YOUR CODE HERE
df = df.withColumnRenamed("usd_pledged_real", "pledged")
df = df.withColumnRenamed("usd_goal_real", "goal")
df.show(5)

+---+----------+--------------------+--------------+-------------+--------+----------+-------------------+--------+-------+-------+-------+-------+
|_c0|        ID|                name|      category|main_category|currency|  deadline|           launched|   state|backers|country|pledged|   goal|
+---+----------+--------------------+--------------+-------------+--------+----------+-------------------+--------+-------+-------+-------+-------+
|  0|1000002330|The Songs of Adel...|        Poetry|   Publishing|     GBP|2015-10-09|2015-08-11 12:12:28|  failed|      0|     GB|    0.0|1533.95|
|  1|1000003930|Greeting From Ear...|Narrative Film| Film & Video|     USD|2017-11-01|2017-09-02 04:43:57|  failed|     15|     US| 2421.0|30000.0|
|  2|1000004038|      Where is Hank?|Narrative Film| Film & Video|     USD|2013-02-26|2013-01-12 00:20:50|  failed|      3|     US|  220.0|45000.0|
|  3|1000007540|ToshiCapital Reko...|         Music|        Music|     USD|2012-04-16|2012-03-17 03:24:11|  fail

24/04/15 16:01:26 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , ID, name, category, main_category, currency, deadline, launched, state, backers, country, usd_pledged_real, usd_goal_real
 Schema: _c0, ID, name, category, main_category, currency, deadline, launched, state, backers, country, usd_pledged_real, usd_goal_real
Expected: _c0 but found: 
CSV file: file:///workspaces/BIGDATA/WEEK3/kickstarter_data.csv


## **A.2** - Structural Error, Correct Datatype
---

### Question 3

Write one line of code to print the overall information of the dataset. Are there any columns that you feel like they have the wrong datatype?

In [7]:
# YOUR CODE HERE
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- ID: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- main_category: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- deadline: string (nullable = true)
 |-- launched: string (nullable = true)
 |-- state: string (nullable = true)
 |-- backers: string (nullable = true)
 |-- country: string (nullable = true)
 |-- pledged: string (nullable = true)
 |-- goal: string (nullable = true)



- có một số cột đang sai kiểu dữ liệu như là:
    + deadline, launched, backers, pledged, goal.

### Question 4

Convert the `launched` and `deadline` to `datetime` datatype.

In [8]:
from pyspark.sql.types import StringType,BooleanType,DateType
from pyspark.sql.functions import col

In [9]:
# YOUR CODE HERE
df = df.withColumn("launched",col("launched").cast(DateType())) \
    .withColumn("deadline",col("deadline").cast(DateType()))

Check info one more time to make sure everything goes as plan.

In [10]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- ID: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- main_category: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- deadline: date (nullable = true)
 |-- launched: date (nullable = true)
 |-- state: string (nullable = true)
 |-- backers: string (nullable = true)
 |-- country: string (nullable = true)
 |-- pledged: string (nullable = true)
 |-- goal: string (nullable = true)



## **A.3** - Handling Missing Values
---

### Question 5

Give the number of null values in *each* column.

In [11]:
null_counts = df.select([sum(col(column).isNull().cast("int")).alias(column) for column in df.columns])
null_counts.show()

24/04/15 16:01:27 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , ID, name, category, main_category, currency, deadline, launched, state, backers, country, usd_pledged_real, usd_goal_real
 Schema: _c0, ID, name, category, main_category, currency, deadline, launched, state, backers, country, usd_pledged_real, usd_goal_real
Expected: _c0 but found: 
CSV file: file:///workspaces/BIGDATA/WEEK3/kickstarter_data.csv

+---+---+----+--------+-------------+--------+--------+--------+-----+-------+-------+-------+----+
|_c0| ID|name|category|main_category|currency|deadline|launched|state|backers|country|pledged|goal|
+---+---+----+--------+-------------+--------+--------+--------+-----+-------+-------+-------+----+
|  0|  0|   4|       0|            3|       4|    1297|    1114|    5|      5|      5|     12|   5|
+---+---+----+--------+-------------+--------+--------+--------+-----+-------+-------+-------+----+



                                                                                

### Question 6

Write one line of code to fill all the `NaN` values in name with `Unknown`.

In [12]:
# YOUR CODE HERE
df = df.fillna({'name': 'Unknown'})

In [13]:
null_counts = df.select([sum(col(column).isNull().cast("int")).alias(column) for column in df.columns])
null_counts.show()

24/04/15 16:01:30 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
24/04/15 16:01:30 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , ID, category, main_category, currency, deadline, launched, state, backers, country, usd_pledged_real, usd_goal_real
 Schema: _c0, ID, category, main_category, currency, deadline, launched, state, backers, country, usd_pledged_real, usd_goal_real
Expected: _c0 but found: 
CSV file: file:///workspaces/BIGDATA/WEEK3/kickstarter_data.csv
[Stage 8:>                                                          (0 + 2) / 2]

+---+---+----+--------+-------------+--------+--------+--------+-----+-------+-------+-------+----+
|_c0| ID|name|category|main_category|currency|deadline|launched|state|backers|country|pledged|goal|
+---+---+----+--------+-------------+--------+--------+--------+-----+-------+-------+-------+----+
|  0|  0|   0|       0|            3|       4|    1297|    1114|    5|      5|      5|     12|   5|
+---+---+----+--------+-------------+--------+--------+--------+-----+-------+-------+-------+----+



                                                                                

## **A.4** - Handling errors, corrupted data
---

Scanning through each column to find abnormalities and fix them. Simple as that.

### Question 7

Let's start with `category`. Write an expression to display the frequency of the value in the column `category`. (The unique values and how many times they appear)

In [14]:
from pyspark.sql.functions import count, col

In [15]:
category_frequency = df.groupBy("category").agg(count("*").alias("frequency"))
category_frequency.show()

[Stage 11:>                                                         (0 + 2) / 2]

+--------------------+---------+
|            category|frequency|
+--------------------+---------+
|         we create!"|        1|
|   I'm Your Cream"""|        1|
| a sequel to the ...|        1|
| Meadower and Car...|        1|
|     "" said Sydney"|        1|
|  "" World Premiere"|        1|
|  Italy by the Yard"|        1|
| the parody music...|        1|
|                   7|        1|
|             Alcohol|        1|
| an M.F.A. featur...|        1|
|         So Sorry"""|        1|
| Though I Walk"" ...|        1|
| court-métrage fa...|        1|
| Final  Recording...|        1|
| Lust & Straight ...|        1|
| a one man show a...|        1|
| Iran and Turkey/...|        1|
|  there's your quote|        1|
| Promotion & Dist...|        1|
+--------------------+---------+
only showing top 20 rows



                                                                                

### Question 8

Do the same to check abnormalities in the column `country`.

In [16]:
# YOUR CODE HERE
country_frequency = df.groupBy("country").agg(count("*").alias("frequency"))
country_frequency.show()

[Stage 14:>                                                         (0 + 2) / 2]

+-------+---------+
|country|frequency|
+-------+---------+
|      7|       24|
|     51|        6|
|     54|        5|
|     15|       13|
|    155|        1|
|     NL|     2865|
|     11|       17|
|    101|        2|
|     69|        4|
|     29|       10|
| 3168.0|        1|
|     42|        7|
|     73|        2|
|     87|        1|
|    468|        1|
|     MX|     1745|
|     64|        5|
|      3|       56|
|     30|        6|
|     34|        7|
+-------+---------+
only showing top 20 rows



                                                                                

### Question 9

Write an expression to select all rows with that weird value above (`N,0"`).

In [17]:
# YOUR CODE HERE
weird_rows = df.filter(df["country"] == "N,0")
weird_rows.show()

24/04/15 16:01:34 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , ID, name, category, main_category, currency, deadline, launched, state, backers, country, usd_pledged_real, usd_goal_real
 Schema: _c0, ID, name, category, main_category, currency, deadline, launched, state, backers, country, usd_pledged_real, usd_goal_real
Expected: _c0 but found: 
CSV file: file:///workspaces/BIGDATA/WEEK3/kickstarter_data.csv
                                                                                

+---+---+----+--------+-------------+--------+--------+--------+-----+-------+-------+-------+----+
|_c0| ID|name|category|main_category|currency|deadline|launched|state|backers|country|pledged|goal|
+---+---+----+--------+-------------+--------+--------+--------+-----+-------+-------+-------+----+
+---+---+----+--------+-------------+--------+--------+--------+-----+-------+-------+-------+----+



### Question 10

Write one line of code to return the ***unique currencies*** of the projects that have country as `N,0"`?

In [18]:
# YOUR CODE HERE

### Question 11

Our mission is apply a check function onto each row of the country-N0" part.

First, define a function that takes in a whole data row.

- If currency is `USD` ---> country is `US`
- If currency is `AUD` ---> country is `AU`
- If currency is `CAD` ---> country is `CA`
- If currency is `GBP` ---> country is `GB`
- If currency is `SEK` ---> country is `SE`
- If currency is `DKK` ---> country is `DK`
- If currency is `NZD` ---> country is `NZ`
- If currency is `NOK` ---> country is `NO`
- If currency is `CHF` ---> country is `CH`
- If currency is `EUR` ---> country is `DE`

In the `EUR` case, we choose to replace by the mode --- `DE` (Within projects that in `EUR`, the most are from `DE` -- Germany)

In [20]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# YOUR CODE HERE

In [27]:
#dựng 1 function để có thể chuyển được về các country
def map_currency_to_country(currency):
    if currency == "USD":
        return "US"
    elif currency == "AUD":
        return "AU"
    elif currency == "CAD":
        return "CA"
    elif currency == "GBP":
        return "GB"
    elif currency == "SEK":
        return "SE"
    elif currency == "DKK":
        return "DK"
    elif currency == "NZD":
        return "NZ"
    elif currency == "NOK":
        return "NO"
    elif currency == "CHF":
        return "CH"
    elif currency == "EUR":
        return "DE"

In [22]:
map_currency_to_country_udf = udf(map_currency_to_country, StringType())

In [24]:
df = df.withColumn("country_mapped", map_currency_to_country_udf(df["currency"]))
df.show()

24/04/15 16:29:54 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , ID, name, category, main_category, currency, deadline, launched, state, backers, country, usd_pledged_real, usd_goal_real
 Schema: _c0, ID, name, category, main_category, currency, deadline, launched, state, backers, country, usd_pledged_real, usd_goal_real
Expected: _c0 but found: 
CSV file: file:///workspaces/BIGDATA/WEEK3/kickstarter_data.csv


+---+----------+--------------------+--------------+-------------+--------+----------+----------+----------+-------+-------+---------+--------+--------------+
|_c0|        ID|                name|      category|main_category|currency|  deadline|  launched|     state|backers|country|  pledged|    goal|country_mapped|
+---+----------+--------------------+--------------+-------------+--------+----------+----------+----------+-------+-------+---------+--------+--------------+
|  0|1000002330|The Songs of Adel...|        Poetry|   Publishing|     GBP|2015-10-09|2015-08-11|    failed|      0|     GB|      0.0| 1533.95|            GB|
|  1|1000003930|Greeting From Ear...|Narrative Film| Film & Video|     USD|2017-11-01|2017-09-02|    failed|     15|     US|   2421.0| 30000.0|            US|
|  2|1000004038|      Where is Hank?|Narrative Film| Film & Video|     USD|2013-02-26|2013-01-12|    failed|      3|     US|    220.0| 45000.0|            US|
|  3|1000007540|ToshiCapital Reko...|         

                                                                                

### Question 12

Save result using .write.parquet()


In [25]:
df.write.parquet("output.parquet")

24/04/15 16:31:25 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , ID, name, category, main_category, currency, deadline, launched, state, backers, country, usd_pledged_real, usd_goal_real
 Schema: _c0, ID, name, category, main_category, currency, deadline, launched, state, backers, country, usd_pledged_real, usd_goal_real
Expected: _c0 but found: 
CSV file: file:///workspaces/BIGDATA/WEEK3/kickstarter_data.csv
                                                                                

In [26]:
df_2 = spark.read.csv("https://raw.githubusercontent.com/congcuongca1207/BIGDATA/main/WEEK3/kickstarter_data.csv", header=True, inferSchema=True)

UnsupportedOperationException: None