# 💫 PySpark App

This is my Apache Spark project. Here you will find some stuff that I've done while I was learning about working with Spark and Python.

---

_You can find [@avcaliani](#) at [GitHub](https://github.com/avcaliani) or [GitLab](https://gitlab.com/avcaliani)._

In [2]:
#  ___                _   
# / __|_ __  __ _ _ _| |__
# \__ \ '_ \/ _` | '_| / /
# |___/ .__/\__,_|_| |_\_\
#     |_|                 
#
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark
!python -V

import os
import findspark

os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
os.environ['SPARK_HOME'] = '/content/spark-2.4.5-bin-hadoop2.7'
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession \
  .builder\
  .master('local[*]') \
  .getOrCreate()

Python 3.6.9


In [0]:
#                      _     
#  __ _ ___  ___  __ _| |___ 
# / _` / _ \/ _ \/ _` | / -_)
# \__, \___/\___/\__, |_\___|
# |___/          |___/       
#
from google.colab import files
files.upload() # data/yob1997.csv

In [0]:
from pyspark.sql import DataFrame, Column
from pyspark.sql.functions import col, when, udf

In [0]:
# __   __    _ _    _      _           
# \ \ / /_ _| (_)__| |__ _| |_ ___ _ _ 
#  \ V / _` | | / _` / _` |  _/ _ \ '_|
#   \_/\__,_|_|_\__,_\__,_|\__\___/_|  


def validate(data: DataFrame) -> DataFrame:
  return data \
    .withColumn('gender_valid', check_gender(data.gender)) \
    .withColumn('number_valid', check_number(data.number)) \
    .withColumn('is_valid'    , is_record_valid(col('gender_valid'), col('number_valid'))) \
    .drop('gender_valid', 'number_valid')


def check_gender(gender: Column) -> Column:
  return when((gender == "F") | (gender == "M"), True) \
    .otherwise(False) \
    .cast("boolean")


@udf('boolean')
def check_number(number: str) -> bool:
  return number is not None and number.isdigit() and int(number) >= 20000


def is_record_valid(gender_valid: Column, number_valid: Column) -> Column:
  return when(gender_valid & number_valid, True) \
    .otherwise(False) \
    .cast("boolean")


In [6]:
#  __  __      _      
# |  \/  |__ _(_)_ _  
# | |\/| / _` | | ' \ 
# |_|  |_\__,_|_|_||_|
#
data: DataFrame = spark \
  .read \
  .option('header', 'true') \
  .csv('yob1997.csv')

print(f"""
 ___             
| _ \__ ___ __ __
|   / _` \ V  V /
|_|_\__,_|\_/\_/ 

Input Records: {data.count()}
""")
data.printSchema()
data.show(5)


 ___             
| _ \__ ___ __ __
|   / _` \ V  V /
|_|_\__,_|\_/\_/ 

Input Records: 26970

root
 |-- name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- number: string (nullable = true)

+-------+------+------+
|   name|gender|number|
+-------+------+------+
|  Emily|     F| 25732|
|Jessica|     F| 21042|
| Ashley|     F| 20896|
|  Sarah|     F| 20707|
| Hannah|     F| 20593|
+-------+------+------+
only showing top 5 rows



In [7]:
#  _   _   _ 
# (_) (_) (_)
#
valid_data = validate(data) \
  .filter(col('is_valid') == True) \
  .drop('is_valid') \
  .orderBy(col('number').desc())

print(f"""
__   __    _ _    _ 
\ \ / /_ _| (_)__| |
 \ V / _` | | / _` |
  \_/\__,_|_|_\__,_|

Valid Records: {valid_data.count()}
""")
valid_data.printSchema()
valid_data.show(5)
valid_data.groupBy('gender').count().show()


__   __    _ _    _ 
\ \ / /_ _| (_)__| |
 \ V / _` | | / _` |
  \_/\__,_|_|_\__,_|

Valid Records: 25

root
 |-- name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- number: string (nullable = true)

+-----------+------+------+
|       name|gender|number|
+-----------+------+------+
|    Michael|     M| 37549|
|      Jacob|     M| 34153|
|    Matthew|     M| 31514|
|Christopher|     M| 29105|
|     Joshua|     M| 28284|
+-----------+------+------+
only showing top 5 rows

+------+-----+
|gender|count|
+------+-----+
|     F|    6|
|     M|   19|
+------+-----+



In [0]:
#  ______    ___               
# | |__| |  / __| __ ___ _____ 
# |  ()  |  \__ \/ _` \ V / -_)
# |______|  |___/\__,_|\_/\___|
#
valid_data\
  .write \
  .mode('overwrite') \
  .option("header", "true") \
  .option("delimiter", ',') \
  .option("nullValue", None) \
  .option("emptyValue", None) \
  .csv('yob1997.valid.csv')

#                      _     
#  __ _ ___  ___  __ _| |___ 
# / _` / _ \/ _ \/ _` | / -_)
# \__, \___/\___/\__, |_\___|
# |___/          |___/       
#
!rm output.zip
!zip -r output.zip yob1997.valid.csv
files.download('output.zip') 