# ⭐ Scaling Machine Learning in Three Week course - Week 2:
##  PySpark - data marshaling

In this excercise, you will use:
 * Mock data
 * Bot data set
 * DataFrame
 * Spark SQL
 * Spark Summary



This excercise is part of the [Scaling Machine Learning with Spark book](https://learning.oreilly.com/library/view/scaling-machine-learning/9781098106812/)
available on the O'Reilly platform or on [Amazon](https://amzn.to/3WgHQvd).


In [8]:
# Create SparkSession from builder
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]") \
                    .appName('Scalling_ml_with_spark-week_2') \
                    .getOrCreate()


## ✅ **Task 1 :** Split text

How would you go about taking text and turning it into words using PySpark functionality?

Have a look at `Tokenizer` and `RegexTokenizer`.

<br>

<details><summary> Click here to see the Solution </summary>
<p>
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)

countTokens = udf(lambda words: len(words), IntegerType())

tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")\
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)

regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words") \
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)
```
</p>
</details>




In [9]:
sentenceDataFrame = spark.createDataFrame([
    (0, "Hi|I|heard|about|Spark"),
    (1, "I     wish Java      could use case classes"),
    (2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])


# your solutions goes here 
# ...

## ✅ **Task 2:** words into numerical vectors

How can you take a string of text and turn it into a numerical vectors representation of words? Spark has a unique functionality for that - `Word2Vec`.

Take the documentDF and use `Word2Vec` to turn the string of text into a numerical vectors.

<br>

<details><summary> Click here to see the Solution</summary>
<p>
    

    
```python
# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)

result = model.transform(documentDF)
for row in result.collect():
    text, vector = row
    print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))
    
```
</p>
</details>




In [None]:
from pyspark.ml.feature import Word2Vec

# Input data: Each row is a bag of words from a sentence or document.
documentDF = spark.createDataFrame([
    ("Hi I heard about Spark".split(" "), ),
    ("I wish Java could use case classes".split(" "), ),
    ("Logistic regression models are neat".split(" "), )
], ["text"])

# Your answer goes here ...
# ...

## ✅ **Task 3:** stop words

How can you take a document of words and remove the noisy stop words?

Take the documentDF and use `StopWordsRemover` to turn the string of text into a numerical vectors.

<br>

<details><summary> Click here to see the Solution</summary>
<p>
    
here:
    
```python
remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData).show(truncate=False)
```
</p>
</details>




In [None]:
from pyspark.ml.feature import StopWordsRemover

sentenceData = spark.createDataFrame([
    (0, ["I", "saw", "the", "red", "balloon"]),
    (1, ["Mary", "had", "a", "little", "lamb"])
], ["id", "raw"])


# your solution goes here
# ...


## ✅ **Task 4:** Binarized features

Sometimes, vectorized features, need to go through a binarization process to better fit the problem domain.

Take a continues column and use `Binarizer` to transform the feature into a binary one.

<br>

<details><summary> Click here to see the Solution</summary>
<p>
    
here:
    
```python
binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature")

binarizedDataFrame = binarizer.transform(continuousDataFrame)

print("Binarizer output with Threshold = %f" % binarizer.getThreshold())
binarizedDataFrame.show()
```
</p>
</details>



In [None]:
from pyspark.ml.feature import Binarizer

continuousDataFrame = spark.createDataFrame([
    (0, 5.1),
    (1, 5.8),
    (2, 0.2)
], ["id", "feature"])

# your solution goes here
# ...


### ✅ **Task 5 :**  load the bot data & marshell it

In [None]:
df = spark.read.csv ('../datasets/bot_data.csv', header= True) 

#### get a reminder of how this data looks like:

Look at 2 records from the DataFrame to understand the values better before filter: use take() function

df.take(insert an integer here)

In [None]:
df.take(2)

In [None]:
df.printSchema()

In [None]:
df.limit(25) .toPandas ()

How many lines have missing values? run the next command to figure it out!

```python
import pyspark.sql.functions as f
from functools import reduce
df.where(reduce(lambda x, y: x | y, (f.col(x).isNull() for x in df.columns))).count()
functools is a python 3 library.
```

reduce is part of functools, it takes two arguments: x and y, and produce cumulative items of iterable - in our case: x | y | is python OR operator, we concat x and y functionality with OR operator

For example, reduce(lambda x, y: x+y, [1, 2, 3, 4, 5]) calculates ((((1+2)+3)+4)+5)

Run only reduce function and check the output:

```reduce(lambda x, y: x | y, (f.col(x).isNull() for x in df.columns))```




In [None]:
# your solution goes here
# ...

You created a concatenation of OR operators with IS NULL functionality for all the columns!

Now, put it together:

In [None]:
import pyspark.sql.functions as f
from functools import reduce

reducePhrase = reduce(lambda x, y: x | y, (f.col(x).isNull() for x in df.columns))

df.where(reducePhrase).count()

#### Distinct Value
Get the sum of id distinct values, it should be equal to the size of the data

Try both id and id_str fields.

Use the next code and adjust it according to the field:

```python
df.select("field_name").distinct().count()
```
What happened here? Is it in the same size of the data set? Don't worry; We fix that soon!

In [None]:
# your solution goes here
# ...

#### Is Null

How many rows have null on the screen_name column?

Use the where with col .isNull function to get the DataFrame with null value for column_name.

Count it! Use the count method for that.

Code sample:

```python
df.where(f.col('column_name').isNull()).count()
```


In [None]:
# your solution goes here
# ...


#### Standard Deviation
As part of exploring the data phase, the standard deviation(stddev) is a must!

Calculate stddev for followers_count.

#### Notice!
Some rows have None/Null for followers_count, we can:

Ignore and not calculate the stddev for them
#### OR

Give them a default value
#### OR

Filter them entirely out of our training data.
Start with counting how many rows has null for followers_count:

Run this:
```python
df.where(f.col('followers_count').isNull()).count()
```

In [None]:
# your solution goes here
# ...

We go with: `2. Give them a default value`

Give deafult values with - Fill null values - fillna()
Give the null cells a default value: Using [fillna](https://nbviewer.org/github/Learn-Apache-Spark/SparkML/blob/master/notebooks/Solution/Solution%201%20-%20Intro%20to%20Data%20Cleaning%20and%20Preparation.ipynb#:~:text=default%20value%3A%20Using-,fillna,-Notice%20the%20matching).

Notice the matching type request. Meaning, if a column is of type string, we will need a default value of type string. At the moment, all are fields are of type string.

Code sample:

```python
df_defaultvalue = df.fillna({'column_name':'0'})
```

<details><summary>Solution</summary>
<p>
    
df_defaultvalue = df.fillna({'followers_count':'0'})
    
```
</p>
</details>

Remember to valide yourself with count:

```pyhton
df_defaultvalue.where(f.col('followers_count').isNull()).count()
```


In [None]:
# your solution goes here
# ...

2nd phase of **standard deviation** calculation is:

Casting data to numbers!

Cast it to integer:

In the code sample, replace the `column_name` with `followers_count`:
```pyhton
from pyspark.sql.types import IntegerType

data_df = df_defaultvalue.withColumn("column_name", df_defaultvalue["column_name"].cast(IntegerType()))
```

### ✅ **Task 7 :**  Gather more statistics to better understand the data

Use `pyspark.sql.function` methods, [here are the docs](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions).
                                                    

Check out `describe` functionality. it provides us `count`, `mean`, `stddev`, `min` and `max` calculations in one function!

*Remember* - Use the last DataFrame that you created, with the casting and default values.

describe can take any field, or calculate statistics for all fields.

Code Example:
```python
df.describe(['age']).show()
df.describe().toPandas().transpose()
```
In the code example, Change age to followers_count and run it!

In [None]:
# your solution goes here
# ...

### ✅ **Task 8 :**  Continue Marshelling the bot data

Adapt `bot` column.

bot is the data classification column, which indicated if the row represents a bot or not.

1. Cast it into Integer.
2. Set 1 or 0: 1 for bot and 0 for none bot.
If we don't know what it is, use 0.

Run the next commands, and remember to validate yourself!
Make sure to use the right dataframe.

```python
df = df.withColumn('bot',df['bot'].cast(IntegerType()))
df.limit(5) .toPandas()
```

In [None]:
# your solution goes here
# ...

In [None]:
Now you should have better understanding of the data. let's drop irrelevant columns:

Run the next commands:
    
```python

# Dropping irrelevant columns and duplicates
df = df.drop('default_profile_image','has_extended_profile','url','created_at','lang','id','id_str')
df = dft.dropDuplicates()
```

In [None]:
from pyspark.sql.types import IntegerType, ArrayType, BooleanType, StringType
from pyspark.sql.functions import udf
from pyspark.sql.functions import when

# your solution goes here
# ...

Next, cast and transform the data into the decided upon column types.

Run the next commands and break them down into transformations so it will be easier for you to follow along:

```python
# First Transformation
df_test = df_test.withColumn("friends_count", df_test["friends_count"].cast(IntegerType()))
df_test = df_test.withColumn("listed_count", df_test["listed_count"].cast(IntegerType()))
df_test = df_test.withColumn("favourites_count", df_test["favourites_count"].cast(IntegerType()))
df_test = df_test.withColumn("statuses_count", df_test["statuses_count"].cast(IntegerType()))
df_test = df_test.withColumn("verified", df_test["verified"].cast(BooleanType()))
df_test = df_test.withColumn("default_profile", df_test["default_profile"].cast(BooleanType()))

# Second Transformation
df_test = df_test.withColumn('default_profile',df_test['default_profile'].cast(IntegerType()))
df_test = df_test.withColumn('name',when(df_test['name'].isNull(),0).otherwise(1))
df_test = df_test.withColumn('verified',df_test['verified'].cast(IntegerType()))

# Theird Transformation
df_test = df_test.withColumn('verified',when(df_test['verified'].isNull(),0).otherwise(df_test['verified']))
df_test = df_test.withColumn('default_profile',when(df_test['default_profile'].isNull(),0).otherwise(df_test['default_profile']))
df_test = df_test.withColumn('location',when(df_test['location'].isNull(),0).otherwise(1))
df_test = df_test.withColumn('status',when(df_test['status'].isNull(),0).otherwise(1))
df_test = df_test.withColumn('screen_name',when(df_test['screen_name'].isNull(),0).otherwise(1))

# Forth Transformation
df_test = df_test.dropna(subset=['description'])

def split_and_set(some_str):
    if isinstance(some_str, str):
        some_str = ''.join(c for c in some_str if c not in "[](){}<>,'/.")
        return list(set(some_str.split(' ')))
    return some_str

list_udf = udf(lambda y: split_and_set(y), ArrayType(StringType()))
df_test = df_test.withColumn('description', list_udf(df_test['description']))

# Fifth Transformation - fill NA:
df_test = df_test.fillna({'followers_count':0,'statuses_count':0,'favourites_count':0,'listed_count':0,'friends_count':0,})
```

In [None]:
# your solution goes here
# ...

Save this data to parquest so it is easier to work with later

```python
df_test.write.parquet("marshalled_data")
```

In [None]:
# your solution goes here
# ...

#### Well Done! 👏👏👏
You just finished: Marshaling the data!