## Read, Write and Validate

In [4]:
import pyspark
from pyspark.sql import SparkSession

session = SparkSession \
    .builder    \
    .appName("Read, Write and Validate")    \
    .getOrCreate()


session

In [8]:
sc = session._jsc.sc() 
n_workers =  len([executor.host() for executor in sc.statusTracker().getExecutorInfos() ]) - 1

print(n_workers)

1


In [12]:
path = "Datasets/"

students = session.read.csv(
    path + "students.csv",      # Schema
    inferSchema=True,           # Automaticaly set the datatypes
    header=True)                # First row is the header

In [13]:
students.limit(4).toPandas()

Unnamed: 0,gender,race/ethnicity,parental level of education,lunch,test preparation course,math score,reading score,writing score
0,female,group B,bachelor's degree,standard,none,72,72,74
1,female,group C,some college,standard,completed,69,90,88
2,female,group B,master's degree,standard,none,90,95,93
3,male,group A,associate's degree,free/reduced,none,47,57,44


### Partitioned data sets 

In [16]:
partitioned = session.read.parquet(path+"users*")
users1_2 = session.read.parquet(path+"users1.parquet", path+"users2.parquet")


In [15]:
partitioned.limit(6).toPandas()

  series = series.astype(t, copy=False)


Unnamed: 0,registration_dttm,id,first_name,last_name,email,gender,ip_address,cc,country,birthdate,salary,title,comments
0,2016-02-03 07:55:29,1,Amanda,Jordan,ajordan0@com.com,Female,1.197.201.2,6759521864920116.0,Indonesia,3/8/1971,49756.53,Internal Auditor,100.0
1,2016-02-03 17:04:03,2,Albert,Freeman,afreeman1@is.gd,Male,218.111.175.34,,Canada,1/16/1968,150280.17,Accountant IV,
2,2016-02-03 01:09:31,3,Evelyn,Morgan,emorgan2@altervista.org,Female,7.161.136.94,6767119071901597.0,Russia,2/1/1960,144972.51,Structural Engineer,
3,2016-02-03 00:36:21,4,Denise,Riley,driley3@gmpg.org,Female,140.35.109.83,3576031598965625.0,China,4/8/1997,90263.05,Senior Cost Accountant,
4,2016-02-03 05:05:31,5,Carlos,Burns,cburns4@miitbeian.gov.cn,,169.113.235.40,5602256255204850.0,South Africa,,,,
5,2016-02-03 07:22:34,6,Kathryn,White,kwhite5@google.com,Female,195.131.81.179,3583136326049310.0,Indonesia,2/25/1983,69227.11,Account Executive,


In [17]:
users1_2.limit(4).toPandas()

  series = series.astype(t, copy=False)


Unnamed: 0,registration_dttm,id,first_name,last_name,email,gender,ip_address,cc,country,birthdate,salary,title,comments
0,2016-02-03 07:55:29,1,Amanda,Jordan,ajordan0@com.com,Female,1.197.201.2,6759521864920116.0,Indonesia,3/8/1971,49756.53,Internal Auditor,100.0
1,2016-02-03 17:04:03,2,Albert,Freeman,afreeman1@is.gd,Male,218.111.175.34,,Canada,1/16/1968,150280.17,Accountant IV,
2,2016-02-03 01:09:31,3,Evelyn,Morgan,emorgan2@altervista.org,Female,7.161.136.94,6767119071901597.0,Russia,2/1/1960,144972.51,Structural Engineer,
3,2016-02-03 00:36:21,4,Denise,Riley,driley3@gmpg.org,Female,140.35.109.83,3576031598965625.0,China,4/8/1997,90263.05,Senior Cost Accountant,


### Validation

In [18]:
students.printSchema()

root
 |-- gender: string (nullable = true)
 |-- race/ethnicity: string (nullable = true)
 |-- parental level of education: string (nullable = true)
 |-- lunch: string (nullable = true)
 |-- test preparation course: string (nullable = true)
 |-- math score: integer (nullable = true)
 |-- reading score: integer (nullable = true)
 |-- writing score: integer (nullable = true)



In [20]:
students.columns

['gender',
 'race/ethnicity',
 'parental level of education',
 'lunch',
 'test preparation course',
 'math score',
 'reading score',
 'writing score']

In [23]:
students    \
    .select("math score")   \
    .summary("count", "min", "max", "mean") \
    .toPandas() # Instead of show we use toPandas()

Unnamed: 0,summary,math score
0,count,1000.0
1,min,0.0
2,max,100.0
3,mean,66.089


### How to specify datatypes

In [29]:
from pyspark.sql.types import (
    StructField,
    StringType,
    DateType,
    IntegerType,
    BooleanType,
    FloatType,
    StructType
)

In [30]:
schema = [
    StructField('name', StringType(), True),
    StructField('email', StringType(), True),
    StructField('city', StringType(), True),
    StructField('mac', StringType(), True),
    StructField('timestamp', DateType(), True),
    StructField('creditcard', StringType(), True)
]

data_schema = StructType(fields= schema)

people = session.read.json(path+"people.json", schema=data_schema)

In [31]:
people.limit(10).toPandas()

Unnamed: 0,name,email,city,mac,timestamp,creditcard
0,,,,,,
1,Keeley Bosco,katlyn@jenkinsmaggio.net,Lake Gladysberg,08:fd:0b:cd:77:f7,2015-04-25,1228-1221-1221-1431
2,Rubye Jerde,juvenal@johnston.name,,90:4d:fa:42:63:a2,2015-04-25,1228-1221-1221-1431
3,Miss Darian Breitenberg,,,f9:0e:d3:40:cb:e9,2015-04-25,
4,Celine Ankunding,emery_kunze@rogahn.net,,3a:af:c9:0b:5c:08,2015-04-25,1228-1221-1221-1431
5,Dr. Araceli Lang,mavis_lehner@jacobi.name,Yvettemouth,9e:ea:28:41:2a:50,2015-04-25,1211-1221-1234-2201
6,Esteban Von,,,2d:e4:f0:dd:90:96,2015-04-25,
7,Everette Swift,gielle_jacobs@flatleyboehm.biz,,29:e0:54:7a:b7:ca,2015-04-25,
8,Terrell Boyle,augustine.conroy@keebler.name,Port Reaganfort,c5:32:09:5a:f7:15,2015-04-25,1228-1221-1221-1431
9,Miss Emmie Muller,,Kaleyhaven,be:dc:d2:57:81:8b,2015-04-25,


### Write down data

In [33]:
students    \
    .write  \
    .mode('overwrite')  \
    .csv(path+"Write_test")

### Create a DF from scratch

In [37]:
lista = [("Peras", 10), ("Bananas", 5), ("Arándanos", 11)]

schema = [
    StructField("Fruta", StringType(), True),
    StructField("Qty", IntegerType(), True)
]

data_schema = StructType(fields=schema)

df = session.createDataFrame(lista, schema=data_schema)
df.show()

+---------+---+
|    Fruta|Qty|
+---------+---+
|    Peras| 10|
|  Bananas|  5|
|Arándanos| 11|
+---------+---+

