**For more information about those session below, access: [this url](https://www.youtube.com/watch?v=_C8kWso4ne4&ab_channel=freeCodeCamp.org)**

## Part 1
    - PySpark DataFrame
    - Reading the dataset
    - Checking the datatype
    - Selecting columns and indexing
    - Check describe just like pandas
    - Adding/Dropping columns

In [4]:
import numpy as np

import pyspark.sql.functions as sf
from pyspark.sql import SparkSession 
spark = SparkSession.builder.getOrCreate()
spark

In [58]:
# read the dataset
df = spark.read.csv('./imdb_ratings.csv', header=True, inferSchema=True)
df.show(2)

+-----------+--------------------+--------------+-----+--------+--------------------+
|star_rating|               title|content_rating|genre|duration|         actors_list|
+-----------+--------------------+--------------+-----+--------+--------------------+
|        9.3|The Shawshank Red...|             R|Crime|     142|[u'Tim Robbins', ...|
|        9.2|       The Godfather|             R|Crime|     175|[u'Marlon Brando'...|
+-----------+--------------------+--------------+-----+--------+--------------------+
only showing top 2 rows



In [59]:
# pd.df.info()
df.printSchema()

root
 |-- star_rating: double (nullable = true)
 |-- title: string (nullable = true)
 |-- content_rating: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- actors_list: string (nullable = true)



In [60]:
# same thing with pd.df.head(2)
df.head(2)

[Row(star_rating=9.3, title='The Shawshank Redemption', content_rating='R', genre='Crime', duration=142, actors_list="[u'Tim Robbins', u'Morgan Freeman', u'Bob Gunton']"),
 Row(star_rating=9.2, title='The Godfather', content_rating='R', genre='Crime', duration=175, actors_list="[u'Marlon Brando', u'Al Pacino', u'James Caan']")]

In [61]:
# select column(s), but return a dataframe
# you can see the data in dataframe
star_rating = df.select('star_rating')
title_genre = df.select(['title', 'genre'])
title_genre.show(2)

+--------------------+-----+
|               title|genre|
+--------------------+-----+
|The Shawshank Red...|Crime|
|       The Godfather|Crime|
+--------------------+-----+
only showing top 2 rows



In [62]:
# select column(s) and return a column or a dataframe
print(df['star_rating']) # return a column, can you .show() to see the data
print(df[['star_rating', 'genre']]) # return a dataframe, just use .show() to see the data

Column<'star_rating'>
DataFrame[star_rating: double, genre: string]


In [63]:
# hiển thị kiểu dữ liệu của các cột 
df.dtypes

[('star_rating', 'double'),
 ('title', 'string'),
 ('content_rating', 'string'),
 ('genre', 'string'),
 ('duration', 'int'),
 ('actors_list', 'string')]

In [64]:
df.describe().show()

+-------+-------------------+--------------------+--------------+-------+------------------+--------------------+
|summary|        star_rating|               title|content_rating|  genre|          duration|         actors_list|
+-------+-------------------+--------------------+--------------+-------+------------------+--------------------+
|  count|                979|                 979|           976|    979|               979|                 979|
|   mean| 7.8897854954034985|               796.0|          null|   null|120.97957099080695|                null|
| stddev|0.33606932614795176|   1090.190808987124|          null|   null|26.218009846412077|                null|
|    min|                7.4|(500) Days of Summer|      APPROVED| Action|                64|"[u""Brian O'Hall...|
|    max|                9.3|               [Rec]|             X|Western|               242|[u'Zooey Deschane...|
+-------+-------------------+--------------------+--------------+-------+---------------

In [65]:
df.count(), len(df.columns)

(979, 6)

In [77]:
# add column: luôn luôn phải đính kèm giá trị 
            # hoặc sửa data trong column đó (chuẩn hóa data)
            # link chuẩn hóa đã được bookmark
df = df.withColumn('hello', df['duration'] * 2 + df['duration'])
df = df.withColumn('hola', sf.lit(69))
df.show(2)

+-----------+--------------------+--------------+-----+--------+--------------------+----+-----+
|star_rating|               title|content_rating|genre|duration|         actors_list|hola|hello|
+-----------+--------------------+--------------+-----+--------+--------------------+----+-----+
|        9.3|The Shawshank Red...|             R|Crime|     142|[u'Tim Robbins', ...|  69|  426|
|        9.2|       The Godfather|             R|Crime|     175|[u'Marlon Brando'...|  69|  525|
+-----------+--------------------+--------------+-----+--------+--------------------+----+-----+
only showing top 2 rows



In [82]:
# drop column
df = df.drop('hello', 'hola')
df.show(2)

+-----------+--------------------+--------------+-----+--------+--------------------+
|star_rating|               title|content_rating|genre|duration|         actors_list|
+-----------+--------------------+--------------+-----+--------+--------------------+
|        9.3|The Shawshank Red...|             R|Crime|     142|[u'Tim Robbins', ...|
|        9.2|       The Godfather|             R|Crime|     175|[u'Marlon Brando'...|
+-----------+--------------------+--------------+-----+--------+--------------------+
only showing top 2 rows



In [84]:
df = df.withColumnRenamed('duration', 'still duration')
df.show(2)

+-----------+--------------------+--------------+-----+--------------+--------------------+
|star_rating|               title|content_rating|genre|still duration|         actors_list|
+-----------+--------------------+--------------+-----+--------------+--------------------+
|        9.3|The Shawshank Red...|             R|Crime|           142|[u'Tim Robbins', ...|
|        9.2|       The Godfather|             R|Crime|           175|[u'Marlon Brando'...|
+-----------+--------------------+--------------+-----+--------------+--------------------+
only showing top 2 rows



## Part 2: Handling with missing value

    - Dropping columns (missing >= 50%)
    - Dropping rows (missing >= 50%)
    - bla
    - Handling missing value by mean, median, mode,...

In [None]:
import numpy as np

import pyspark.sql.functions as sf
from pyspark.sql import SparkSession 
spark = SparkSession.builder.getOrCreate()
spark

In [61]:
data = [
    (1,1, 1, 3.0, 1, '1'),
    (2,1, 2, float(5), 2, '2'),
    (3,8, 3, np.nan, None, None),
    (4,None, 4, None, None, None),
    (5,3, 5, float(10), None, None),
    (6,4, 6, float("nan"), None, None),
    (7,2, None, float("nan"), None, None),
]
df = spark.createDataFrame(data, ("id", "session", "timestamp1", "id2", "None value", "None value 1"))
df.show()

+---+-------+----------+----+----------+------------+
| id|session|timestamp1| id2|None value|None value 1|
+---+-------+----------+----+----------+------------+
|  1|      1|         1| 3.0|         1|           1|
|  2|      1|         2| 5.0|         2|           2|
|  3|      8|         3| NaN|      null|        null|
|  4|   null|         4|null|      null|        null|
|  5|      3|         5|10.0|      null|        null|
|  6|      4|         6| NaN|      null|        null|
|  7|      2|      null| NaN|      null|        null|
+---+-------+----------+----+----------+------------+



In [50]:
# drop all rows that contain NULL or NaN
df.na.drop().show()

+---+-------+----------+---+----------+------------+
| id|session|timestamp1|id2|None value|None value 1|
+---+-------+----------+---+----------+------------+
|  1|      1|         1|3.0|         1|           1|
|  2|      1|         2|5.0|         2|           2|
+---+-------+----------+---+----------+------------+



In [51]:
# xóa các dòng có số attribute null >= (len(df.columns)//2+1)
df.na.drop(thresh=len(df.columns)//2+1).show()

+---+-------+----------+----+----------+------------+
| id|session|timestamp1| id2|None value|None value 1|
+---+-------+----------+----+----------+------------+
|  1|      1|         1| 3.0|         1|           1|
|  2|      1|         2| 5.0|         2|           2|
|  5|      3|         5|10.0|      null|        null|
+---+-------+----------+----+----------+------------+



In [52]:
# xóa các dòng có session và id2 chứa null (chỉ xét trong 2 cột này)
df.na.drop(subset=['session', 'id2'], how='any').show()

+---+-------+----------+----+----------+------------+
| id|session|timestamp1| id2|None value|None value 1|
+---+-------+----------+----+----------+------------+
|  1|      1|         1| 3.0|         1|           1|
|  2|      1|         2| 5.0|         2|           2|
|  5|      3|         5|10.0|      null|        null|
+---+-------+----------+----+----------+------------+



In [62]:
# xóa các cột có trên 50% là null???
df.printSchema()

root
 |-- id: long (nullable = true)
 |-- session: long (nullable = true)
 |-- timestamp1: long (nullable = true)
 |-- id2: double (nullable = true)
 |-- None value: long (nullable = true)
 |-- None value 1: string (nullable = true)



In [66]:
# fill missing value with specific value
# nhớ để ý kiểu dữ liệu: điền khuyết bằng string sẽ chỉ áp dụng cho các cột string, 
#                        điền số sẽ chỉ áp cho các cột số
df.na.fill(10000).show()

+---+-------+----------+-------+----------+------------+
| id|session|timestamp1|    id2|None value|None value 1|
+---+-------+----------+-------+----------+------------+
|  1|      1|         1|    3.0|         1|           1|
|  2|      1|         2|    5.0|         2|           2|
|  3|      8|         3|10000.0|     10000|        null|
|  4|  10000|         4|10000.0|     10000|        null|
|  5|      3|         5|   10.0|     10000|        null|
|  6|      4|         6|10000.0|     10000|        null|
|  7|      2|     10000|10000.0|     10000|        null|
+---+-------+----------+-------+----------+------------+



In [78]:
df.show()

+---+-------+----------+----+----------+------------+
| id|session|timestamp1| id2|None value|None value 1|
+---+-------+----------+----+----------+------------+
|  1|      1|         1| 3.0|         1|           1|
|  2|      1|         2| 5.0|         2|           2|
|  3|      8|         3| NaN|      null|        null|
|  4|   null|         4|null|      null|        null|
|  5|      3|         5|10.0|      null|        null|
|  6|      4|         6| NaN|      null|        null|
|  7|      2|      null| NaN|      null|        null|
+---+-------+----------+----+----------+------------+



In [90]:
from pyspark.ml.feature import Imputer

# điền khuyết cho một vài cột nhất định
# session, id2 = mean/mode/median
imputer = Imputer(inputCols=['session', 'id2'], 
                  outputCols=[f'{c}_' for c in ['session', 'id2']], 
                  strategy='median') # mode, median,...
imputer.fit(df).transform(df).show()

+---+-------+----------+----+----------+------------+--------+----+
| id|session|timestamp1| id2|None value|None value 1|session_|id2_|
+---+-------+----------+----+----------+------------+--------+----+
|  1|      1|         1| 3.0|         1|           1|       1| 3.0|
|  2|      1|         2| 5.0|         2|           2|       1| 5.0|
|  3|      8|         3| NaN|      null|        null|       8| 5.0|
|  4|   null|         4|null|      null|        null|       2| 5.0|
|  5|      3|         5|10.0|      null|        null|       3|10.0|
|  6|      4|         6| NaN|      null|        null|       4| 5.0|
|  7|      2|      null| NaN|      null|        null|       2| 5.0|
+---+-------+----------+----+----------+------------+--------+----+



## Part 3 - Filter

    - Filter
    - &, |, ==
    - ~

In [None]:
import numpy as np

import pyspark.sql.functions as sf
from pyspark.sql import SparkSession 
spark = SparkSession.builder.getOrCreate()
spark

In [91]:
df = spark.read.csv('./imdb_ratings.csv', header=True, inferSchema=True)
df.show(3)

+-----------+--------------------+--------------+-----+--------+--------------------+
|star_rating|               title|content_rating|genre|duration|         actors_list|
+-----------+--------------------+--------------+-----+--------+--------------------+
|        9.3|The Shawshank Red...|             R|Crime|     142|[u'Tim Robbins', ...|
|        9.2|       The Godfather|             R|Crime|     175|[u'Marlon Brando'...|
|        9.1|The Godfather: Pa...|             R|Crime|     200|[u'Al Pacino', u'...|
+-----------+--------------------+--------------+-----+--------+--------------------+
only showing top 3 rows



In [105]:
# find the movies whose durations >= 200
# return a dataframe 
df.filter(~(df['duration']>=200) & (df['genre']=='Crime') | (df.star_rating==9.)).show()

+-----------+--------------------+--------------+------+--------+--------------------+
|star_rating|               title|content_rating| genre|duration|         actors_list|
+-----------+--------------------+--------------+------+--------+--------------------+
|        9.3|The Shawshank Red...|             R| Crime|     142|[u'Tim Robbins', ...|
|        9.2|       The Godfather|             R| Crime|     175|[u'Marlon Brando'...|
|        9.0|     The Dark Knight|         PG-13|Action|     152|[u'Christian Bale...|
|        8.9|        Pulp Fiction|             R| Crime|     154|[u'John Travolta'...|
+-----------+--------------------+--------------+------+--------+--------------------+



In [100]:
type(df.show())

+-----------+--------------------+--------------+---------+--------+--------------------+
|star_rating|               title|content_rating|    genre|duration|         actors_list|
+-----------+--------------------+--------------+---------+--------+--------------------+
|        9.3|The Shawshank Red...|             R|    Crime|     142|[u'Tim Robbins', ...|
|        9.2|       The Godfather|             R|    Crime|     175|[u'Marlon Brando'...|
|        9.1|The Godfather: Pa...|             R|    Crime|     200|[u'Al Pacino', u'...|
|        9.0|     The Dark Knight|         PG-13|   Action|     152|[u'Christian Bale...|
|        8.9|        Pulp Fiction|             R|    Crime|     154|[u'John Travolta'...|
|        8.9|        12 Angry Men|     NOT RATED|    Drama|      96|[u'Henry Fonda', ...|
|        8.9|The Good, the Bad...|     NOT RATED|  Western|     161|[u'Clint Eastwood...|
|        8.9|The Lord of the R...|         PG-13|Adventure|     201|[u'Elijah Wood', ...|
|        8

NoneType

## Work with SQLServer

    - Demo CRUD operations

In [16]:
import numpy as np

import pyspark.sql.functions as sf
from pyspark.sql import * 
spark = SparkSession.builder.getOrCreate()
spark

In [37]:
HOSTNAME = "localhost"
PORT = 1433
USERNAME = "sa"
PASSWORD = "Longhandsome123"
# DB_NAME = "demo"

def createUrl(dbName, hostname=HOSTNAME, port=PORT, username=USERNAME, password=PASSWORD):
    return f"jdbc:sqlserver://{hostname}:{port};database={dbName};user={username};password={password}"

In [87]:
data = [
    (1, 'Nguyen Bao Long', '0919070940'),
    (2, 'Bao Long', '09'),
    (3, 'Nguyen Bao', '0940'),
]
    
# write to a table into demo db (create)
df = spark.createDataFrame(data, ("id", "name", "phone"))
df.printSchema()
df.write.jdbc(url=createUrl(dbName='demo'), table='customer_order', mode='overwrite')

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- phone: string (nullable = true)



In [36]:
# append a row into demo db (update)
id_customer = 4
name = "Bảo Long Nguyễn"
phone = '01923'

df = spark.createDataFrame([(id_customer, name, phone)], ('id', 'name', 'phone'))
df.write.jdbc(createUrl(dbName='demo'), table='customer_order', mode='append')

In [41]:
# query in table
query = '''
    (SELECT *
    FROM customer_order as co
    WHERE co.id = 1
        OR co.id = 2) t
'''
df = spark.read.jdbc(createUrl(dbName='demo'), table=query)
df.show()

+---+---------------+----------+
| id|           name|     phone|
+---+---------------+----------+
|  2|       Bao Long|        09|
|  1|Nguyen Bao Long|0919070940|
+---+---------------+----------+



In [90]:
# read data to df
df = spark.read.jdbc(createUrl(dbName='demo'), table='customer_order')
df.show()

+---+---------------+----------+
| id|           name|     phone|
+---+---------------+----------+
|  1|Nguyen Bao Long|0919070940|
|  2|       Bao Long|        09|
|  3|     Nguyen Bao|      0940|
+---+---------------+----------+



In [92]:
# update
df.withColumn(
    'id',
    sf.when((df.phone=='0940') | (df.phone=='019'), '69')
    .otherwise(df.phone)
).show()

+----------+---------------+----------+
|        id|           name|     phone|
+----------+---------------+----------+
|0919070940|Nguyen Bao Long|0919070940|
|        09|       Bao Long|        09|
|        69|     Nguyen Bao|      0940|
+----------+---------------+----------+



In [93]:
# delete column
df.drop(df.name).show()

+---+----------+
| id|     phone|
+---+----------+
|  1|0919070940|
|  2|        09|
|  3|      0940|
+---+----------+



In [None]:
# delete row
'''
    - có vẻ như spark không có cơ chế xóa như vậy
    --> giải quyết bằng cách lọc ra những thằng k cần xóa, bỏ những thằng đó vào 1 df mới và thao tác trên df đó
'''