# Defining a schema

- Creating a defined schema helps with data quality and import performance. As mentioned during the lesson, we'll create a simple schema to read in the following columns:
> - Name
> - Age
> - City

- The `Name` and `City` columns are `StringType()` and the `Age` column is an `IntegerType()`.

## Instructions

- Import * from the `pyspark.sql.types` library.
- Define a new schema using the `StructType` method.
- Define a `StructField` for `name`, `age`, and `city`. Each field should correspond to the correct datatype and not be nullable.

In [4]:
# Intialization
import os
import sys

os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

# NOTE: Whichever package you want mention here.
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0 pyspark-shell' 
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'

In [5]:
#Entrypoint 2.x
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().getOrCreate()

# On yarn:
# spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().master("yarn").getOrCreate()
# specify .master("yarn")

sc = spark.sparkContext

In [6]:
# Import the pyspark.sql.types library
from pyspark.sql.types import *

# Define a new schema using the StructType method
people_schema = StructType([
  # Define a StructField for each field
  StructField('name', StringType(), False),
  StructField('age', IntegerType(), False),
  StructField('city', StringType(), False)
])

In [7]:
print(people_schema)

StructType(List(StructField(name,StringType,false),StructField(age,IntegerType,false),StructField(city,StringType,false)))


In [8]:
people_schema = StructType([
  # Define a StructField for each field
  StructField('id', IntegerType(), False),
  StructField('person_id', IntegerType(), False),
  StructField('name', StringType(), False),
  StructField('sex', StringType(), False),
  StructField('date of birth', StringType(), False)
])

In [13]:
file_path = "file:////home/talentum/test-jupyter/P3/M1/SM1/1_IntrotodatacleaningwithApacheSpark/people.csv"
people = spark.read.csv(file_path, header = True, schema = people_schema)

In [15]:
people.show()

+---+---------+-----------------+------+-------------+
| id|person_id|             name|   sex|date of birth|
+---+---------+-----------------+------+-------------+
|  0|      100|   Penelope Lewis|female|   1990-08-31|
|  1|      101|    David Anthony|  male|   1971-10-14|
|  2|      102|        Ida Shipp|female|   1962-05-24|
|  3|      103|     Joanna Moore|female|   2017-03-10|
|  4|      104|   Lisandra Ortiz|female|   2020-08-05|
|  5|      105|    David Simmons|  male|   1999-12-30|
|  6|      106|    Edward Hudson|  male|   1983-05-09|
|  7|      107|     Albert Jones|  male|   1990-09-13|
|  8|      108| Leonard Cavender|  male|   1958-08-08|
|  9|      109|   Everett Vadala|  male|   2005-05-24|
| 10|      110| Freddie Claridge|  male|   2002-05-07|
| 11|      111|Annabelle Rosseau|female|   1989-07-13|
| 12|      112|    Eulah Emanuel|female|   1976-01-19|
| 13|      113|       Shaun Love|  male|   1970-05-26|
| 14|      114|Alejandro Brennan|  male|   1980-12-22|
| 15|     

In [16]:
people.write.csv("file:///home/talentum/people_df/")

In [17]:
new_schema = StructType([
  # Define a StructField for each field
  StructField('id', IntegerType(), False),
  StructField('person_id', IntegerType(), False),
  StructField('name', StringType(), False)])

new_people = spark.read.format("csv").load("file:///home/talentum/people_df/", schema = new_schema)

In [18]:
new_people.show()

+---+---------+-----------------+
| id|person_id|             name|
+---+---------+-----------------+
|  0|      100|   Penelope Lewis|
|  1|      101|    David Anthony|
|  2|      102|        Ida Shipp|
|  3|      103|     Joanna Moore|
|  4|      104|   Lisandra Ortiz|
|  5|      105|    David Simmons|
|  6|      106|    Edward Hudson|
|  7|      107|     Albert Jones|
|  8|      108| Leonard Cavender|
|  9|      109|   Everett Vadala|
| 10|      110| Freddie Claridge|
| 11|      111|Annabelle Rosseau|
| 12|      112|    Eulah Emanuel|
| 13|      113|       Shaun Love|
| 14|      114|Alejandro Brennan|
| 15|      115|Robert Mcreynolds|
| 16|      116|   Carla Spickard|
| 17|      117|Florence Eberhart|
| 18|      118|     Tina Gaskins|
| 19|      119| Florence Mulhern|
+---+---------+-----------------+
only showing top 20 rows

