# Practice with Pyspark

### Create session

In [29]:
from pyspark import SparkContext
from pyspark.sql import SparkSession, Window, Row
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pyspark.pandas as ps
from datetime import datetime
import pyarrow
import os
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

In [5]:
spark = SparkSession.builder.master("local[*]").appName('Practice_For_PySpark').getOrCreate()

### Get data to read

In [6]:
file_path = r'C:\Users\LISA\Learning\What-I-have-learned\Extracting_With_Apis\csv'

anime_data = spark.read.csv( file_path,
    sep = '|',
    header = True,
    )

anime_data.printSchema()
anime_data.show()

root
 |-- index: string (nullable = true)
 |-- id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- rank: string (nullable = true)

+-----+-----+--------------------+----+
|index|   id|               title|rank|
+-----+-----+--------------------+----+
|    0| 5114|Fullmetal Alchemi...|   1|
|    1|11061|Hunter x Hunter (...|   2|
|    2| 9253|         Steins;Gate|   3|
|    3|   21|           One Piece|   4|
|    4| 1535|          Death Note|   5|
|    5|16498|  Shingeki no Kyojin|   6|
|    6| 1575|Code Geass: Hangy...|   7|
|    7| 1735|  Naruto: Shippuuden|   8|
|    8|   30|Neon Genesis Evan...|   9|
|    9|32281|      Kimi no Na wa.|  10|
|   10|38000|    Kimetsu no Yaiba|  11|
|   11|23273|Shigatsu wa Kimi ...|  12|
|   12|28851|      Koe no Katachi|  13|
|   13|    1|        Cowboy Bebop|  14|
|   14| 2001|Tengen Toppa Gurr...|  15|
|   15|40748| Jujutsu Kaisen (TV)|  16|
|   16| 2904|Code Geass: Hangy...|  17|
|   17|   20|              Naruto|  18|
|   18|205

### Change data type of column 

In [7]:
datatype_edit_schema = [
               StructField('id', IntegerType(), True),
               StructField('index', IntegerType(), True),
               StructField('title', StringType(), True),
               StructField('rank', IntegerType(), True),
                ]

structure = StructType(fields = datatype_edit_schema)

anime_dataf = spark.read.csv(file_path,
    sep = '|',
    header = True,
    schema = structure
    )

anime_dataf.printSchema()
anime_dataf.show()

root
 |-- id: integer (nullable = true)
 |-- index: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rank: integer (nullable = true)

+---+-----+--------------------+----+
| id|index|               title|rank|
+---+-----+--------------------+----+
|  0| 5114|Fullmetal Alchemi...|   1|
|  1|11061|Hunter x Hunter (...|   2|
|  2| 9253|         Steins;Gate|   3|
|  3|   21|           One Piece|   4|
|  4| 1535|          Death Note|   5|
|  5|16498|  Shingeki no Kyojin|   6|
|  6| 1575|Code Geass: Hangy...|   7|
|  7| 1735|  Naruto: Shippuuden|   8|
|  8|   30|Neon Genesis Evan...|   9|
|  9|32281|      Kimi no Na wa.|  10|
| 10|38000|    Kimetsu no Yaiba|  11|
| 11|23273|Shigatsu wa Kimi ...|  12|
| 12|28851|      Koe no Katachi|  13|
| 13|    1|        Cowboy Bebop|  14|
| 14| 2001|Tengen Toppa Gurr...|  15|
| 15|40748| Jujutsu Kaisen (TV)|  16|
| 16| 2904|Code Geass: Hangy...|  17|
| 17|   20|              Naruto|  18|
| 18|20583|           Haikyuu!!|  19|
| 19| 4181|

### Explore the data

In [8]:
anime_dataf.schema

StructType([StructField('id', IntegerType(), True), StructField('index', IntegerType(), True), StructField('title', StringType(), True), StructField('rank', IntegerType(), True)])

In [9]:
anime_dataf.count()

300

In [10]:
anime_dataf.head(5)

[Row(id=0, index=5114, title='Fullmetal Alchemist: Brotherhood', rank=1),
 Row(id=1, index=11061, title='Hunter x Hunter (2011)', rank=2),
 Row(id=2, index=9253, title='Steins;Gate', rank=3),
 Row(id=3, index=21, title='One Piece', rank=4),
 Row(id=4, index=1535, title='Death Note', rank=5)]

### Column manipulation

In [11]:
anime_dataframe = anime_dataf.withColumn("watched", when(anime_dataf.title == 'Kimetsu no Yaiba', 'Yes')
                                            .when(anime_dataf.title == 'Jujutsu Kaisen (TV)', 'Yes')
                                            )
anime_dataframe.show()

+---+-----+--------------------+----+-------+
| id|index|               title|rank|watched|
+---+-----+--------------------+----+-------+
|  0| 5114|Fullmetal Alchemi...|   1|   null|
|  1|11061|Hunter x Hunter (...|   2|   null|
|  2| 9253|         Steins;Gate|   3|   null|
|  3|   21|           One Piece|   4|   null|
|  4| 1535|          Death Note|   5|   null|
|  5|16498|  Shingeki no Kyojin|   6|   null|
|  6| 1575|Code Geass: Hangy...|   7|   null|
|  7| 1735|  Naruto: Shippuuden|   8|   null|
|  8|   30|Neon Genesis Evan...|   9|   null|
|  9|32281|      Kimi no Na wa.|  10|   null|
| 10|38000|    Kimetsu no Yaiba|  11|    Yes|
| 11|23273|Shigatsu wa Kimi ...|  12|   null|
| 12|28851|      Koe no Katachi|  13|   null|
| 13|    1|        Cowboy Bebop|  14|   null|
| 14| 2001|Tengen Toppa Gurr...|  15|   null|
| 15|40748| Jujutsu Kaisen (TV)|  16|    Yes|
| 16| 2904|Code Geass: Hangy...|  17|   null|
| 17|   20|              Naruto|  18|   null|
| 18|20583|           Haikyuu!!|  

In [12]:
anime_dataframe = anime_dataframe.withColumn("date_inputted", when(anime_dataframe.watched.isNull(), None)
                                .when(anime_dataframe.watched == "Yes", datetime.today().strftime('%Y-%m-%d'))
                                 .otherwise(anime_dataframe.watched))
anime_dataframe.show()

+---+-----+--------------------+----+-------+-------------+
| id|index|               title|rank|watched|date_inputted|
+---+-----+--------------------+----+-------+-------------+
|  0| 5114|Fullmetal Alchemi...|   1|   null|         null|
|  1|11061|Hunter x Hunter (...|   2|   null|         null|
|  2| 9253|         Steins;Gate|   3|   null|         null|
|  3|   21|           One Piece|   4|   null|         null|
|  4| 1535|          Death Note|   5|   null|         null|
|  5|16498|  Shingeki no Kyojin|   6|   null|         null|
|  6| 1575|Code Geass: Hangy...|   7|   null|         null|
|  7| 1735|  Naruto: Shippuuden|   8|   null|         null|
|  8|   30|Neon Genesis Evan...|   9|   null|         null|
|  9|32281|      Kimi no Na wa.|  10|   null|         null|
| 10|38000|    Kimetsu no Yaiba|  11|    Yes|   2022-08-03|
| 11|23273|Shigatsu wa Kimi ...|  12|   null|         null|
| 12|28851|      Koe no Katachi|  13|   null|         null|
| 13|    1|        Cowboy Bebop|  14|   

In [13]:
anime_dataframe = anime_dataframe.withColumnRenamed('index', 'mal_index')
anime_dataframe.show()

+---+---------+--------------------+----+-------+-------------+
| id|mal_index|               title|rank|watched|date_inputted|
+---+---------+--------------------+----+-------+-------------+
|  0|     5114|Fullmetal Alchemi...|   1|   null|         null|
|  1|    11061|Hunter x Hunter (...|   2|   null|         null|
|  2|     9253|         Steins;Gate|   3|   null|         null|
|  3|       21|           One Piece|   4|   null|         null|
|  4|     1535|          Death Note|   5|   null|         null|
|  5|    16498|  Shingeki no Kyojin|   6|   null|         null|
|  6|     1575|Code Geass: Hangy...|   7|   null|         null|
|  7|     1735|  Naruto: Shippuuden|   8|   null|         null|
|  8|       30|Neon Genesis Evan...|   9|   null|         null|
|  9|    32281|      Kimi no Na wa.|  10|   null|         null|
| 10|    38000|    Kimetsu no Yaiba|  11|    Yes|   2022-08-03|
| 11|    23273|Shigatsu wa Kimi ...|  12|   null|         null|
| 12|    28851|      Koe no Katachi|  13

In [14]:
anime_dataframe = anime_dataframe.drop('date_watched')
anime_dataframe.show()

+---+---------+--------------------+----+-------+-------------+
| id|mal_index|               title|rank|watched|date_inputted|
+---+---------+--------------------+----+-------+-------------+
|  0|     5114|Fullmetal Alchemi...|   1|   null|         null|
|  1|    11061|Hunter x Hunter (...|   2|   null|         null|
|  2|     9253|         Steins;Gate|   3|   null|         null|
|  3|       21|           One Piece|   4|   null|         null|
|  4|     1535|          Death Note|   5|   null|         null|
|  5|    16498|  Shingeki no Kyojin|   6|   null|         null|
|  6|     1575|Code Geass: Hangy...|   7|   null|         null|
|  7|     1735|  Naruto: Shippuuden|   8|   null|         null|
|  8|       30|Neon Genesis Evan...|   9|   null|         null|
|  9|    32281|      Kimi no Na wa.|  10|   null|         null|
| 10|    38000|    Kimetsu no Yaiba|  11|    Yes|   2022-08-03|
| 11|    23273|Shigatsu wa Kimi ...|  12|   null|         null|
| 12|    28851|      Koe no Katachi|  13

In [15]:
anime_df_na = anime_dataframe.dropna()
anime_df_na.show()

+---+---------+-------------------+----+-------+-------------+
| id|mal_index|              title|rank|watched|date_inputted|
+---+---------+-------------------+----+-------+-------------+
| 10|    38000|   Kimetsu no Yaiba|  11|    Yes|   2022-08-03|
| 15|    40748|Jujutsu Kaisen (TV)|  16|    Yes|   2022-08-03|
+---+---------+-------------------+----+-------+-------------+



### Querying data

In [16]:
anime_dataframe.filter((col('watched') >= lit('Yes')) & (col('date_inputted') <= lit('2022-08-03'))).show()

+---+---------+-------------------+----+-------+-------------+
| id|mal_index|              title|rank|watched|date_inputted|
+---+---------+-------------------+----+-------+-------------+
| 10|    38000|   Kimetsu no Yaiba|  11|    Yes|   2022-08-03|
| 15|    40748|Jujutsu Kaisen (TV)|  16|    Yes|   2022-08-03|
+---+---------+-------------------+----+-------+-------------+



In [17]:
anime_dataframe.filter(anime_dataframe.rank.between(100, 110)).show()

+---+---------+--------------------+----+-------+-------------+
| id|mal_index|               title|rank|watched|date_inputted|
+---+---------+--------------------+----+-------+-------------+
| 99|    13125|      Shinsekai yori| 100|   null|         null|
|100|     7054|Kaichou wa Maid-s...| 101|   null|         null|
|101|    10165|            Nichijou| 102|   null|         null|
|102|    11617|     High School DxD| 103|   null|         null|
|103|    28999|           Charlotte| 104|   null|         null|
|104|    35120|   Devilman: Crybaby| 105|   null|         null|
|105|    47778|Kimetsu no Yaiba:...| 106|   null|         null|
|106|    23283|   Zankyou no Terror| 107|   null|         null|
|107|      392|     Yuu☆Yuu☆Hakusho| 108|   null|         null|
|108|    25777|Shingeki no Kyoji...| 109|   null|         null|
|109|      934|Higurashi no Naku...| 110|   null|         null|
+---+---------+--------------------+----+-------+-------------+



### Turning Spark dataframe into a Pandas dataframe

In [18]:
anime_pandas_df = anime_dataframe.pandas_api()
anime_pandas_df.head(20)

Unnamed: 0,id,mal_index,title,rank,watched,date_inputted
0,0,5114,Fullmetal Alchemist: Brotherhood,1,,
1,1,11061,Hunter x Hunter (2011),2,,
2,2,9253,Steins;Gate,3,,
3,3,21,One Piece,4,,
4,4,1535,Death Note,5,,
5,5,16498,Shingeki no Kyojin,6,,
6,6,1575,Code Geass: Hangyaku no Lelouch,7,,
7,7,1735,Naruto: Shippuuden,8,,
8,8,30,Neon Genesis Evangelion,9,,
9,9,32281,Kimi no Na wa.,10,,


In [19]:
anime_pandas_df.dtypes

id                int32
mal_index         int32
title            object
rank              int32
watched          object
date_inputted    object
dtype: object

In [20]:
anime_pandas_df.index

Int64Index([  0,   1,   2,   3,   4,   5,   6,   7,   8,   9,
            ...
            290, 291, 292, 293, 294, 295, 296, 297, 298, 299],
           dtype='int64', length=300)

In [21]:
anime_pandas_df.sort_values(by='mal_index')

Unnamed: 0,id,mal_index,title,rank,watched,date_inputted
13,13,1,Cowboy Bebop,14,,
158,158,6,Trigun,159,,
39,39,19,Monster,40,,
17,17,20,Naruto,18,,
3,3,21,One Piece,4,,
8,8,30,Neon Genesis Evangelion,9,,
79,79,32,Neon Genesis Evangelion: The End of Evangelion,80,,
91,91,33,Kenpuu Denki Berserk,92,,
185,185,43,Koukaku Kidoutai,186,,
218,218,45,Rurouni Kenshin: Meiji Kenkaku Romantan,219,,


In [22]:
anime_pandas_df.dropna(how='any')

Unnamed: 0,id,mal_index,title,rank,watched,date_inputted
10,10,38000,Kimetsu no Yaiba,11,Yes,2022-08-03
15,15,40748,Jujutsu Kaisen (TV),16,Yes,2022-08-03


In [23]:
anime_pandas_df['watched'] = anime_pandas_df['watched'].fillna('No')
anime_pandas_df['date_inputted'] = anime_pandas_df['date_inputted'].fillna('Not yet')
anime_pandas_df.head(20)

Unnamed: 0,id,mal_index,title,rank,watched,date_inputted
0,0,5114,Fullmetal Alchemist: Brotherhood,1,No,Not yet
1,1,11061,Hunter x Hunter (2011),2,No,Not yet
2,2,9253,Steins;Gate,3,No,Not yet
3,3,21,One Piece,4,No,Not yet
4,4,1535,Death Note,5,No,Not yet
5,5,16498,Shingeki no Kyojin,6,No,Not yet
6,6,1575,Code Geass: Hangyaku no Lelouch,7,No,Not yet
7,7,1735,Naruto: Shippuuden,8,No,Not yet
8,8,30,Neon Genesis Evangelion,9,No,Not yet
9,9,32281,Kimi no Na wa.,10,No,Not yet


### Saving the file as a paquet

In [25]:
anime_pandas_df.to_parquet('Anime_Rankings.parquet' )

### Reading the parquet file

In [35]:
ps.read_parquet('Anime_Rankings.parquet')

Unnamed: 0,id,mal_index,title,rank,watched,date_inputted
0,0,5114,Fullmetal Alchemist: Brotherhood,1,No,Not yet
1,1,11061,Hunter x Hunter (2011),2,No,Not yet
2,2,9253,Steins;Gate,3,No,Not yet
3,3,21,One Piece,4,No,Not yet
4,4,1535,Death Note,5,No,Not yet
5,5,16498,Shingeki no Kyojin,6,No,Not yet
6,6,1575,Code Geass: Hangyaku no Lelouch,7,No,Not yet
7,7,1735,Naruto: Shippuuden,8,No,Not yet
8,8,30,Neon Genesis Evangelion,9,No,Not yet
9,9,32281,Kimi no Na wa.,10,No,Not yet
