# ⭐ 1- Intro to Data Cleaning and Preparation ⭐
In this chapter, you will: 

•	Exercise 1: Load data into a Spark DataFrame (DF) 

•	Exercise 2: Query the DF using SQL to get a feel for the data 

•	Exercise 3: Filter and transform the Data 



In [None]:
from pyspark.sql import SparkSession 


In [None]:
spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Intro") \
    .getOrCreate()

## Exercise 1: Load the data:

In [None]:
df = spark.read.csv ('training_bot_data.csv', header= True) 

### What is the size of the data? use count() function 

In [None]:
# understand what the data size is:

#### Immutability

DataFrame in Spark is **immutable**.

What does that mean?
It means that every action we do on DataFrame doesn't change the actual DataFrame!

Instead, it creates a new DataFrame.
Run the next commands and get a feel for working with DataFrame.

Don't worry if you don't understand everything completely, the next exercises go deeper into it.

In [None]:
df.limit(2) .toPandas ()

In [None]:
df_new = df.select('bot')

In [None]:
df_new.limit(2) .toPandas ()

You probably notice that `df_new`, and `df` are different!
They are pointers to two different DataFrames.

Try the next command:

In [None]:
df.select('bot').limit(2) .toPandas ()

In [None]:
df.limit(2) .toPandas ()

The last `toPandas ()` commands printed different results, 

### why?

`df.select('bot')` functionality returns pointer to a new immutable DataFrame! AHA!

Let's have `df_new` and `df` point to the same DataFrame.
By doing this, we release the pointer from `df_new` and it can be erased from memory.

If we wish to have access to it again, we will need to rerun the logic.
Bare that in mind for working with `Apache Spark`.

In [None]:
df_new = df
df_new.limit(2) .toPandas()

By the Way! `limit(2)` returns a pointer to a DataFrame with 2 rows.

Interesting! This is what **Immutability** means!! 


## Exercise 2: Get a feel for the data

Look at 2 records from the DataFrame to understand the values better before filter: use take() function

```python
df.take(insert an integer here)
```

Check out the schema structure of the DataFrame.

What are the types of columns?
Use:

```python
df.printSchema()
```

In [None]:
df.printSchema()

Run the next function:

```python
df.limit(5) .toPandas ()
```

What happened here? `toPandas` function took the Spark DataFrame and translated it into Pandas DataFrame.

#### Run this function only on small data sets and when exploring the data. 
#### Otherwise, you might run out of memory! 


How many lines have missing values? run the next command to figure it out! 

```python
import pyspark.sql.functions as f
from functools import reduce
df.where(reduce(lambda x, y: x | y, (f.col(x).isNull() for x in df.columns))).count()
```

> [functools](https://docs.python.org/3/library/functools.html) is a python 3 library.
> 
> [reduce](https://docs.python.org/3/library/functools.html?highlight=reduce#functools.reduce) is part of functools, it takes two arguments: x and y, and produce cumulative items of iterable - in our case: `x | y`
> `|` is python OR operator, we concat x and y functionality with OR operator

> For example, reduce(lambda x, y: x+y, [1, 2, 3, 4, 5]) calculates ((((1+2)+3)+4)+5)


Run only reduce function and check the output:

`reduce(lambda x, y: x | y, (f.col(x).isNull() for x in df.columns))`



In [None]:
reduce(lambda x, y: x | y, (f.col(x).isNull() for x in df.columns))

You created a concatenation of `OR` operators with `IS NULL` functionality for all the columns!

Now, put it together:

In [None]:
import pyspark.sql.functions as f
from functools import reduce

reducePhrase = reduce(lambda x, y: x | y, (f.col(x).isNull() for x in df.columns))

df.where(reducePhrase).count()

### Distinct Value

Get the sum  of `id` distinct values, it should be equal to the size of the data 

Try both `id` and `id_str` fields.

Use the next code and adjust it according to the field:

```pythob
df.select("field_name").distinct().count()
```


What happened here? Is it in the same size of the data set?
Don't worry; We fix that soon!

### Is Null

How many rows have null on the `screen_name` column?

Use the `where` with col `.isNull` function to get the DataFrame with null value for `column_name`.

Count it! Use the count method for that.

Code sample:
```python
df.where(f.col('column_name').isNull()).count()
```

<details><summary>Answer</summary>
<p>

#### 5

</p>
</details>

### Standard Deviation

As part of exploring the data phase, the standard deviation(stddev) is a must!

Calculate **stddev** for `followers_count`.

### Notice! 
Some rows have None/Null for `followers_count`, we can:

1. Ignore and not calculate the stddev for them

**OR** 

2. Give them a default value

**OR** 

3. Filter them entirely out of our training data.

Start with counting how many rows has null for followers_count:

Run this:
```python
df.where(f.col('followers_count').isNull()).count()
```


We go with:  `2. Give them a default value`

#### Give deafult values with - Fill null values - fillna()

Give the null cells a default value:
Using [fillna](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.fillna)

Notice the matching type request.
Meaning, if a column is of type string, we will need a default value of type string.
At the moment, all are fields are of type string.

Code sample:
```python
df_defaultvalue = df.fillna({'column_name':'0'})
```

<details><summary>Answer</summary>
<p>

df_defaultvalue = df.fillna({'followers_count':'0'})

</p>
</details>

Remember to valide yourself with count:

```python
df_defaultvalue.where(f.col('followers_count').isNull()).count()
```

2nd phase of **standard deviation** calculation is:

Casting data to numbers!

Cast it to integer:

In the code sample, replace the `column_name` with `followers_count`:
```python
from pyspark.sql.types import IntegerType

data_df = df_defaultvalue.withColumn("column_name", df_defaultvalue["column_name"].cast(IntegerType()))
```

Calculate Standard Deviation! 

Use `pyspark.sql.function` methods, [here are the docs](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions)

Check out **describe** functionality. it provides us `count`, `mean`, `stddev`, `min` and `max` calculations in one function!

**Remember** - Use the last DataFrame that you created, with the casting and default values.


`describe` can take any field, or calculate statistics for all fields.

Code Example:
```python
df.describe(['age']).show()
df.describe().toPandas().transpose()

```

In the code example, Change `age` to `followers_count` and run it!



This data is dirty! 

Have you noticed a weird behavior with `id` and `id_str`?

Run `.distinct().count()` on each, and count how many blank values there are there.

Who has the most distinct values? Is it the same as the DataFrame?


Use the code sample and remember to replace column name accordinly
```python
df.select("id_str").distinct().count()
df.select("id").distinct().count()
```

You probably discovered that we couldn't trust `id` nor `id_str` !

Oops! What should we do? Do we need them at all?

Continue to Excercise 3! 

---

## Exercise 3: Filter the DataFrame 

You have reached the last section of cleaning and preparing the data 🎊


In this exercise - you filter, cast, and add a default value to necessary fields using the Spark functionality.

You are going to use the DataFrame that you created in chapters (2,3, and 4!)📙

Follow the instructions. For any questions, please use 👉 the Q&A chat.  

---

Start with casting:
Run the next commands:

In [None]:
from pyspark.sql.types import IntegerType, BooleanType

casted_df = data_df.withColumn("friends_count", data_df["friends_count"].cast(IntegerType()))
casted_df = casted_df.withColumn("listed_count", casted_df["listed_count"].cast(IntegerType()))
casted_df = casted_df.withColumn("favourites_count", casted_df["favourites_count"].cast(IntegerType()))
casted_df = casted_df.withColumn("statuses_count", casted_df["statuses_count"].cast(IntegerType()))
casted_df = casted_df.withColumn("verified", casted_df["verified"].cast(BooleanType()))
casted_df = casted_df.withColumn("default_profile", casted_df["default_profile"].cast(BooleanType()))
casted_df = casted_df.withColumn("has_extended_profile", casted_df["has_extended_profile"].cast(BooleanType()))
casted_df = casted_df.withColumn("default_profile_image", casted_df["default_profile_image"].cast(BooleanType()))

What happened here? check it out:

In [None]:
casted_df.limit(5) .toPandas()


In [None]:
# Happy with the results? Run this:
df = casted_df 

✅ **Task :** 

#### When and withColumn functionality

Let's fix the weird behavior of `id_str` and `id` fields.


Now that we know that there are some blanks values for `id_str`, let's try to fill them out with `id` values.

For achiving that, we will use the `when` functions:

```python
from pyspark.sql.functions import when
new_df = df.select(when(df['age'].isNull(), 3).otherwise(df['age']))

```

Use `when` with the `withColumn` functionality:

```python
from pyspark.sql.functions import withColumn
new_df = df.withColumn('age2', df.age + 2)

```


Put `where` and `withColumn` together:

```python
 new_df = df.withColumn('new_column_name',when(df['column_that_we_check'].isNotNull(),df['colum_if_true']).
                        otherwise(df['column_if_false']))
```


Replace **age** column from the examples with `id_str` and `id` according to the needs.


**Remember!** DataFrame is an immutable object. Each function on DataFrame that transform it creates another DataFrame and returns a pointer to the new one. Remember to work with your latest DataFrame and validate yourself! 



[Docs for when](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.when), 
[Docs for withColumn](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.withColumn)


In [None]:
from pyspark.sql.functions import when
# your code goes here
# test = ..

<details><summary>Hint</summary>
<p>

Use the isNull function with when, like this:
    
```python
when(df['id_str'].isNull(),df['id']).otherwise(df['id_str'])

```  

</p>
</details>

<details><summary>Answer</summary>
<p>
    
```python
from pyspark.sql.functions import when
test = df.withColumn('id_str',when(df['id'].isNotNull(),df['id']).otherwise(df['id_str']))

```  
</p>
</details>

In [None]:
# Happy with the results? save erase your old DataFrame with the next command:

df = test

Validate your output! Now the distinct count for field `id_str` should be higher

In [None]:
# validate yourself
df.select("id_str").distinct().count()

---

✅ **Task :** 

#### drop functionality

Drop column `id` with drop function:
    
```python
   new_df = df.drop('column_name')
```

And validate the schema for the new DataFrame

```python
  new_df.schema
```

After validating the new DataFrame, overwrite the reference to clear memory:

```python
   df = new_df
```

In [None]:
new_df = df.drop('id')

In [None]:
df = new_df

---

✅ **Task :**  

Drop the next fields:
    * `default_profile_image`
    * `has_extended_profile`
    * `url`
    * `created_at`
    
    You can drop field by field, or provide all the fields to drop in one function call.
    
```python
    updated_df = df.drop('age','history','another_column_to_drop')
```




**Remember** to validate yourself with the schema and overwrite the DataFrame reference

In [None]:
# run this:

new_df = df.drop('default_profile_image','has_extended_profile','url','created_at','lang')

In [None]:
# What did you get? Happy with the results?

new_df.limit(15) .toPandas()

In [None]:
df = new_df

---

✅ **Task :**  

#### Drop duplicates and describe functionality

Sometimes, we get duplicated data accidentally, drop all duplicated by using the 
`dropDuplicates` function:
    
```python
    new_df = df.dropDuplicates()
```

In [None]:
# new_df = ... your code goes here
new_df.count()

Use `describe` to understand how the data looks like now.
Remember that describe works only on numerical values.


Use the next code sample and adjust it to your needs:
```python
new_df.describe('column_name').show()
```

In [None]:
new_df.describe('favourites_count').show()

In [None]:
# Happy with the results?

df = new_df

The Machine Learning algorithm you are going to use doesn't take text/string as input. 

Hence, we transfer String columns to boolean or numerical.


Turn all String columns to boolean or numerical, if not possible, drop them.

---

✅ **Task :** 

Most of our data can be translated to _Integer_ , 1 for exist, 0 for non-exist.

Implement that logic for the next columns:
    * location
    * status
    * screen_name
    * name
    
    
    
Code sample:
```python
df = df.withColumn('column_name',when(df['column_name'].isNull(),0).otherwise(1))
```

Run the next command to make it happen! 
    

In [None]:
df = df.withColumn('location',when(df['location'].isNull(),0).otherwise(1))
df = df.withColumn('status',when(df['status'].isNull(),0).otherwise(1))
df = df.withColumn('screen_name',when(df['screen_name'].isNull(),0).otherwise(1))
df = df.withColumn('name',when(df['name'].isNull(),0).otherwise(1))

---

✅ Task :

Adapt `bot` column. 

`bot` is the data classification column, which indicated if the row represents a bot or not. 

1. Cast it into Integer.
2. Set 1 or 0: 1 for bot and 0 for none bot.

If we don't know what it is, use 0.

Run the next commands, and remember to validate yourself!

In [None]:
# 1. cast
df_bot = df.withColumn('bot',df['bot'].cast(IntegerType()))
df_bot.limit(5) .toPandas()

In [None]:
# 2. set 1 or 0
df_bot = df_bot.withColumn('bot',when(df_bot['bot'].isNull(),0).otherwise(df_bot['bot']))
df_bot.limit(5) .toPandas()

Do the same with the other booelan fields:
    Run next commends:

In [None]:
df_bot = df_bot.withColumn('verified',df['verified'].cast(IntegerType()))
df_bot = df_bot.withColumn('default_profile',df_bot['default_profile'].cast(IntegerType()))

df_bot = df_bot.withColumn('verified',when(df_bot['verified'].isNull(),0).otherwise(df_bot['verified']))
df_bot = df_bot.withColumn('default_profile',when(df_bot['default_profile'].isNull(),0).otherwise(df_bot['default_profile']))

df_bot.limit(5) .toPandas()

How many bots and none bots we have in the data?

Run the next command to check out! 

In [None]:
df_bot.where(df['bot']==0).count()

In [None]:
df_bot.where(df['bot']==1).count()

In [None]:
#Happy with the results? 

df = df_bot

---

✅ **Task :** 

#### drop N/A functionality - dropna()


`dropna` functionality is dropping rows where the column given value is null.


1. Drop `id_str` column completly
2. Drop rows with N/A for `description`:

Code example:
```python
df_new = df.drop('id_str')

# in order to avoid errors, drop rows with null/None or N/A for description
df_new = df_new.dropna(subset=['description'])
# validate yourself
df_new.count()

```


In [None]:
# df_new = your code goes here

In [None]:
# Happy with the results?

df = df_new

In [None]:
# Run the next commend, we will need it for chapter number 4
sub = df.selectExpr('description','bot as label')
sub.write.parquet("train_data_only_description")


**Save updated DataFrame to file**
To optimize, speed up queries, and maintain schema information, save the DataFrame as a parquet file. 

>Parquet is a columnar file format that provides optimizations to speed up queries and is a far more efficient file format than CSV or JSON, supported by many data processing systems


---

✅ **Task :** 

#### User Define Function - udf functionality

`description` is the only string column left.
Spark pattern mining algorithm takes an Array of unique Strings as in input.

Hence, for executing pattern mining on description, you implement a function that takes description column string
and turns it into an Array of unique Strings.

For doing it, you will create a UDF - user define function.



Code example for guidence:

```python
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import udf

def split_and_set(some_str):
    return {your python code goes here}

# connect everything together: 
# set the udf
list_udf = udf(lambda y: split_and_set(y), ArrayType(StringType()))

# call udf from withColumn:
new_df = df.withColumn('description', list_udf(df['description']))

#validate yourself!
new_df.take(2)

#all good?
df = new_df
```


You might get errors, in the exception log stack,
search for `AnalysisException` and try to understand the problem.

Try to think about what will happen if you run the code twice?

Do it with a pointer to new DataFrame so you won't lose your results.


<details><summary>Answer</summary>
<p>

```python
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import udf

def split_and_set(some_str):
    if isinstance(some_str, str):
        some_str = ''.join(c for c in some_str if c not in "[](){}<>,'/.")
        return list(set(some_str.split(' ')))
    return some_str


list_udf = udf(lambda y: split_and_set(y), ArrayType(StringType()))

new_df = df.withColumn('description', list_udf(df['description']))
df = new_df
```
    
</p>
</details>


In [None]:
# check out this python code. Run it. What did you get?
# will this work for the task?
# how do you combine it with UDF?

def split_and_set(some_str):
    if isinstance(some_str, str):
        some_str = ''.join(c for c in some_str if c not in "[](){}<>,'/.")
        return list(set(some_str.split(' ')))
    return some_str

some_str = '[csds b lol,a]'

split_and_set(split_and_set(some_str))

In [None]:
# your code goes here




# Validate yourself and save to Parquet!


Before saving the DataFrame to Parquet, look at a sample of the data to validate it.

In [None]:
df.limit(5) .toPandas()

### Happy with the results? Save updated DataFrame to file

In [None]:
# happy with the results? write to file!
# run this command
df.write.mode('overwrite').parquet("final_train_data")

Checkout `final_train_data` libaray on Jupyter Files

What file format did you get? **write** it in our chat! 

# Well Done! 👏👏👏


## You just finished:  Intro to Data Cleaning and Preparation 


## Next Chapter:  Apache Spark ML - Create train and test set 