#### Changing Data Types
* data types in spark: https://spark.apache.org/docs/latest/sql-ref-datatypes.html
* cast: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Column.cast.html?highlight=cast#pyspark.sql.Column.cast

In [0]:
# Reading in the countries.csv file and specifying the schema
countries_path = 'dbfs:/FileStore/read_write_data/countries.csv'
 
from pyspark.sql.types import IntegerType, StringType, DoubleType, StructField, StructType
countries_schema = StructType([
                    StructField("COUNTRY_ID", IntegerType(), False),
                    StructField("NAME", StringType(), False),
                    StructField("NATIONALITY", StringType(), False),
                    StructField("COUNTRY_CODE", StringType(), False),
                    StructField("ISO_ALPHA2", StringType(), False),
                    StructField("CAPITAL", StringType(), False),
                    StructField("POPULATION", DoubleType(), False),
                    StructField("AREA_KM2", IntegerType(), False),
                    StructField("REGION_ID", IntegerType(), True),
                    StructField("SUB_REGION_ID", IntegerType(), True),
                    StructField("INTERMEDIATE_REGION_ID", IntegerType(), True),
                    StructField("ORGANIZATION_REGION_ID", IntegerType(), True)
                    ]
                    )
 
countries=spark.read.csv(path=countries_path, header=True, schema=countries_schema)

In [0]:
#to import all functions
from pyspark.sql.functions import *

In [0]:
# using .dtypes to view data types
countries.dtypes

In [0]:
# Reading the countries file into a new variable without specifying the schema 
countries_dt = spark.read.csv(path=countries_path, header=True)

In [0]:
# All data types are all string
countries_dt.dtypes

In [0]:
# Using the cast method to cast the population column as IntegerType(), IntegerType() has already been imported in the first cell when creating the schema
countries_dt.select(countries_dt['population'].cast(IntegerType())).dtypes

In [0]:
# Using the cast method to cast the population column as StringType(), StringType() has already been imported in the first cell when creating the schema
countries.select(countries['population'].cast(StringType())).dtypes

#### Math Functions
* Math Functions: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html#math-functions

In [0]:
# Simple arithmetic to return the population in milions
countries.select(countries['population']/1000000).withColumnRenamed('(population / 1000000)','population_m').limit(5).display()

# Adding the column to a variable
countries_2 = countries.select(countries['population']/1000000).withColumnRenamed('(population / 1000000)','population_m')

# Using the round function to round to 2 decimal places
from pyspark.sql.functions import round
countries_2.select(round(countries_2['population_m'],2)).withColumnRenamed('round(population_m, 2)', 'population_m').limit(5).display()

population_m
38.041754
2.880917
43.053054
0.055312
0.077142


population_m
38.04
2.88
43.05
0.06
0.08


In [0]:
# create new column population_m_r1 rounded to 1 decimal in countries df
countries.withColumn('population_m_r1', round(countries['population']/1000000, 1)).limit(5).display()

COUNTRY_ID,NAME,NATIONALITY,COUNTRY_CODE,ISO_ALPHA2,CAPITAL,POPULATION,AREA_KM2,REGION_ID,SUB_REGION_ID,INTERMEDIATE_REGION_ID,ORGANIZATION_REGION_ID,population_m_r1
1,Afghanistan,Afghan,AFG,AF,Kabul,38041754.0,652230,30,30,,30,38.0
2,Albania,Albanian,ALB,AL,Tirana,2880917.0,28748,20,70,,20,2.9
3,Algeria,Algerian,DZA,DZ,Algiers,43053054.0,2381741,50,40,,20,43.1
4,American Samoa,American Samoan,ASM,AS,Pago Pago,55312.0,199,40,20,,30,0.1
5,Andorra,Andorran,AND,AD,Andorra la Vella,77142.0,468,20,70,,20,0.1


####A bit of sorting

In [0]:
#importing ascending and descending
from pyspark.sql.functions import asc, desc

In [0]:
countries.sort(countries['population'].asc()).limit(5).display()

COUNTRY_ID,NAME,NATIONALITY,COUNTRY_CODE,ISO_ALPHA2,CAPITAL,POPULATION,AREA_KM2,REGION_ID,SUB_REGION_ID,INTERMEDIATE_REGION_ID,ORGANIZATION_REGION_ID
97,Heard Island and McDonald Islands,Heard Island or McDonald Islands,HMD,HM,,0.0,368,40,90,,30
30,Bouvet Island,Bouvet Island,BVT,BV,,0.0,49,10,10,40.0,40
206,South Georgia and the South Sandwich Islands,South Georgia or South Sandwich Islands,SGS,GS,Grytviken,30.0,3903,10,10,40.0,40
175,Pitcairn,Pitcairn Island,PCN,PN,Adamstown,68.0,43,40,20,,30
79,French Southern Territories,French Southern Territories,ATF,TF,Saint Pierre,150.0,439780,50,160,50.0,20


In [0]:
countries.sort(countries['population'].desc_nulls_last()).limit(5).display()

COUNTRY_ID,NAME,NATIONALITY,COUNTRY_CODE,ISO_ALPHA2,CAPITAL,POPULATION,AREA_KM2,REGION_ID,SUB_REGION_ID,INTERMEDIATE_REGION_ID,ORGANIZATION_REGION_ID
45,China,Chinese,CHN,CN,Beijing,1433783686.0,9706961,30,60,,30
103,India,Indian,IND,IN,New Delhi,1366417754.0,3287590,30,30,,30
236,United States of America,American,USA,US,"Washington, D.C.",329064917.0,9372610,10,80,,10
104,Indonesia,Indonesian,IDN,ID,Jakarta,270625568.0,1904569,30,100,,30
167,Pakistan,Pakistani,PAK,PK,Islamabad,216565318.0,881912,30,30,,30


#### String Functions
* https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html#string-functions

In [0]:
# Use concat_ws to concatenate columns
countries.select(concat_ws('-', countries['name'], lower(countries['country_code'])).alias('Country-country_code')).limit(5).display()

Country-country_code
Afghanistan-afg
Albania-alb
Algeria-dza
American Samoa-asm
Andorra-and


#### Datetime Functions
* Datetime Functions: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html#datetime-functions
* Datetime Patterns: https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html

In [0]:
#using current_timestamp and withColumn to add column to df
countries = countries.withColumn('timestamp', current_timestamp())
countries.limit(5).display()

COUNTRY_ID,NAME,NATIONALITY,COUNTRY_CODE,ISO_ALPHA2,CAPITAL,POPULATION,AREA_KM2,REGION_ID,SUB_REGION_ID,INTERMEDIATE_REGION_ID,ORGANIZATION_REGION_ID,timestamp
1,Afghanistan,Afghan,AFG,AF,Kabul,38041754.0,652230,30,30,,30,2023-05-09T06:07:15.968+0000
2,Albania,Albanian,ALB,AL,Tirana,2880917.0,28748,20,70,,20,2023-05-09T06:07:15.968+0000
3,Algeria,Algerian,DZA,DZ,Algiers,43053054.0,2381741,50,40,,20,2023-05-09T06:07:15.968+0000
4,American Samoa,American Samoan,ASM,AS,Pago Pago,55312.0,199,40,20,,30,2023-05-09T06:07:15.968+0000
5,Andorra,Andorran,AND,AD,Andorra la Vella,77142.0,468,20,70,,20,2023-05-09T06:07:15.968+0000


In [0]:
# Use year to extract the year
countries.select(year(countries['timestamp'])).limit(5).display()

year(timestamp)
2023
2023
2023
2023
2023


In [0]:
#adding literal value to a column in order to convert the data type from string to date
countries = countries.withColumn('date_lit', lit('08-05-2023'))
countries.limit(5).display()

COUNTRY_ID,NAME,NATIONALITY,COUNTRY_CODE,ISO_ALPHA2,CAPITAL,POPULATION,AREA_KM2,REGION_ID,SUB_REGION_ID,INTERMEDIATE_REGION_ID,ORGANIZATION_REGION_ID,timestamp,date_lit
1,Afghanistan,Afghan,AFG,AF,Kabul,38041754.0,652230,30,30,,30,2023-05-09T06:07:16.694+0000,08-05-2023
2,Albania,Albanian,ALB,AL,Tirana,2880917.0,28748,20,70,,20,2023-05-09T06:07:16.694+0000,08-05-2023
3,Algeria,Algerian,DZA,DZ,Algiers,43053054.0,2381741,50,40,,20,2023-05-09T06:07:16.694+0000,08-05-2023
4,American Samoa,American Samoan,ASM,AS,Pago Pago,55312.0,199,40,20,,30,2023-05-09T06:07:16.694+0000,08-05-2023
5,Andorra,Andorran,AND,AD,Andorra la Vella,77142.0,468,20,70,,20,2023-05-09T06:07:16.694+0000,08-05-2023


In [0]:
# Use the to_date function to convert the string to a date
countries = countries.withColumn('conv_date', to_date(countries['date_lit'],'dd-MM-yyyy'))
countries.limit(5).display()

COUNTRY_ID,NAME,NATIONALITY,COUNTRY_CODE,ISO_ALPHA2,CAPITAL,POPULATION,AREA_KM2,REGION_ID,SUB_REGION_ID,INTERMEDIATE_REGION_ID,ORGANIZATION_REGION_ID,timestamp,date_lit,conv_date
1,Afghanistan,Afghan,AFG,AF,Kabul,38041754.0,652230,30,30,,30,2023-05-09T06:07:17.186+0000,08-05-2023,2023-05-08
2,Albania,Albanian,ALB,AL,Tirana,2880917.0,28748,20,70,,20,2023-05-09T06:07:17.186+0000,08-05-2023,2023-05-08
3,Algeria,Algerian,DZA,DZ,Algiers,43053054.0,2381741,50,40,,20,2023-05-09T06:07:17.186+0000,08-05-2023,2023-05-08
4,American Samoa,American Samoan,ASM,AS,Pago Pago,55312.0,199,40,20,,30,2023-05-09T06:07:17.186+0000,08-05-2023,2023-05-08
5,Andorra,Andorran,AND,AD,Andorra la Vella,77142.0,468,20,70,,20,2023-05-09T06:07:17.186+0000,08-05-2023,2023-05-08


#### Filtering Dataframes
* filter: https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.filter.html?highlight=filter#pyspark.pandas.DataFrame.filter
* Operators for conditional statements: https://spark.apache.org/docs/2.3.0/api/sql/index.html

In [0]:
#filtering records where populaton is less than 1b
one_b = 1000000000
countries.filter(countries['population']>one_b).limit(5).display()

COUNTRY_ID,NAME,NATIONALITY,COUNTRY_CODE,ISO_ALPHA2,CAPITAL,POPULATION,AREA_KM2,REGION_ID,SUB_REGION_ID,INTERMEDIATE_REGION_ID,ORGANIZATION_REGION_ID,timestamp,date_lit,conv_date
45,China,Chinese,CHN,CN,Beijing,1433783686.0,9706961,30,60,,30,2023-05-09T06:08:24.350+0000,08-05-2023,2023-05-08
103,India,Indian,IND,IN,New Delhi,1366417754.0,3287590,30,30,,30,2023-05-09T06:08:24.350+0000,08-05-2023,2023-05-08


In [0]:
# Using the locate function inside the filter condition
countries.filter(locate("B", countries['capital'])==1).limit(5).display()

COUNTRY_ID,NAME,NATIONALITY,COUNTRY_CODE,ISO_ALPHA2,CAPITAL,POPULATION,AREA_KM2,REGION_ID,SUB_REGION_ID,INTERMEDIATE_REGION_ID,ORGANIZATION_REGION_ID,timestamp,date_lit,conv_date
10,Argentina,Argentine,ARG,AR,Buenos Aires,44780677.0,2780400,10,10,40.0,40,2023-05-09T06:08:53.782+0000,08-05-2023,2023-05-08
15,Azerbaijan,Azerbaijani,AZE,AZ,Baku,10047718.0,86600,30,170,,30,2023-05-09T06:08:53.782+0000,08-05-2023,2023-05-08
19,Barbados,Barbadian,BRB,BB,Bridgetown,287025.0,430,10,10,60.0,40,2023-05-09T06:08:53.782+0000,08-05-2023,2023-05-08
21,Belgium,Belgian,BEL,BE,Brussels,11539328.0,30528,20,150,,20,2023-05-09T06:08:53.782+0000,08-05-2023,2023-05-08
22,Belize,Belizean,BLZ,BZ,Belmopan,390353.0,22966,10,10,20.0,40,2023-05-09T06:08:53.782+0000,08-05-2023,2023-05-08


In [0]:
# find countries with name longer than 15 letters and not a region id of 10
countries.filter(
    (length(countries['NAME']) > 15) & 
    (countries['REGION_ID'] != 10)
    ).limit(5).display()

COUNTRY_ID,NAME,NATIONALITY,COUNTRY_CODE,ISO_ALPHA2,CAPITAL,POPULATION,AREA_KM2,REGION_ID,SUB_REGION_ID,INTERMEDIATE_REGION_ID,ORGANIZATION_REGION_ID,timestamp,date_lit,conv_date
28,Bosnia and Herzegovina,Bosnian or Herzegovinian,BIH,BA,Sarajevo,3301000.0,51209,20,70,,20,2023-05-09T06:28:50.444+0000,08-05-2023,2023-05-08
32,British Indian Ocean Territory,BIOT,IOT,IO,Point Marianne,4000.0,60,50,160,50.0,20,2023-05-09T06:28:50.444+0000,08-05-2023,2023-05-08
33,Brunei Darussalam,Bruneian,BRN,BN,Bandar Seri Begawan,433285.0,5765,30,100,,30,2023-05-09T06:28:50.444+0000,08-05-2023,2023-05-08
42,Central African Republic,Central African,CAF,CF,Bangui,4745185.0,622984,50,160,80.0,20,2023-05-09T06:28:50.444+0000,08-05-2023,2023-05-08
46,Christmas Island,Christmas Island,CXR,CX,Flying Fish Cove,1843.0,135,40,90,,30,2023-05-09T06:28:50.444+0000,08-05-2023,2023-05-08


In [0]:
# find countries with name longer than 15 letters and not a region id of 10 using sql syntax
countries.filter('length(name) > 15 and region_id != 10').limit(5).display()

COUNTRY_ID,NAME,NATIONALITY,COUNTRY_CODE,ISO_ALPHA2,CAPITAL,POPULATION,AREA_KM2,REGION_ID,SUB_REGION_ID,INTERMEDIATE_REGION_ID,ORGANIZATION_REGION_ID,timestamp,date_lit,conv_date
28,Bosnia and Herzegovina,Bosnian or Herzegovinian,BIH,BA,Sarajevo,3301000.0,51209,20,70,,20,2023-05-09T06:21:43.115+0000,08-05-2023,2023-05-08
32,British Indian Ocean Territory,BIOT,IOT,IO,Point Marianne,4000.0,60,50,160,50.0,20,2023-05-09T06:21:43.115+0000,08-05-2023,2023-05-08
33,Brunei Darussalam,Bruneian,BRN,BN,Bandar Seri Begawan,433285.0,5765,30,100,,30,2023-05-09T06:21:43.115+0000,08-05-2023,2023-05-08
42,Central African Republic,Central African,CAF,CF,Bangui,4745185.0,622984,50,160,80.0,20,2023-05-09T06:21:43.115+0000,08-05-2023,2023-05-08
46,Christmas Island,Christmas Island,CXR,CX,Flying Fish Cove,1843.0,135,40,90,,30,2023-05-09T06:21:43.115+0000,08-05-2023,2023-05-08


#### Conditional Statements
* when: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Column.when.html?highlight=when#pyspark.sql.Column.when

In [0]:
# using the when condition to add population_size column when population is over 100m. if true value = large, else 'not large
countries.withColumn('population_size', when(countries['population'] > 100000000, 'big').otherwise('not that big')).limit(20).display()

COUNTRY_ID,NAME,NATIONALITY,COUNTRY_CODE,ISO_ALPHA2,CAPITAL,POPULATION,AREA_KM2,REGION_ID,SUB_REGION_ID,INTERMEDIATE_REGION_ID,ORGANIZATION_REGION_ID,timestamp,date_lit,conv_date,population_size
1,Afghanistan,Afghan,AFG,AF,Kabul,38041754.0,652230,30,30.0,,30,2023-05-09T06:30:55.678+0000,08-05-2023,2023-05-08,not that big
2,Albania,Albanian,ALB,AL,Tirana,2880917.0,28748,20,70.0,,20,2023-05-09T06:30:55.678+0000,08-05-2023,2023-05-08,not that big
3,Algeria,Algerian,DZA,DZ,Algiers,43053054.0,2381741,50,40.0,,20,2023-05-09T06:30:55.678+0000,08-05-2023,2023-05-08,not that big
4,American Samoa,American Samoan,ASM,AS,Pago Pago,55312.0,199,40,20.0,,30,2023-05-09T06:30:55.678+0000,08-05-2023,2023-05-08,not that big
5,Andorra,Andorran,AND,AD,Andorra la Vella,77142.0,468,20,70.0,,20,2023-05-09T06:30:55.678+0000,08-05-2023,2023-05-08,not that big
6,Angola,Angolan,AGO,AO,Luanda,31825295.0,1246700,50,160.0,80.0,20,2023-05-09T06:30:55.678+0000,08-05-2023,2023-05-08,not that big
7,Anguilla,Anguillan,AIA,AI,The Valley,14869.0,91,10,10.0,60.0,40,2023-05-09T06:30:55.678+0000,08-05-2023,2023-05-08,not that big
8,Antarctica,Antarctic,ATA,AQ,McMurdo Station,1106.0,14200000,40,,,30,2023-05-09T06:30:55.678+0000,08-05-2023,2023-05-08,not that big
9,Antigua and Barbuda,Antiguan or Barbudan,ATG,AG,St. John's,97118.0,442,10,10.0,60.0,40,2023-05-09T06:30:55.678+0000,08-05-2023,2023-05-08,not that big
10,Argentina,Argentine,ARG,AR,Buenos Aires,44780677.0,2780400,10,10.0,40.0,40,2023-05-09T06:30:55.678+0000,08-05-2023,2023-05-08,not that big


#### Expr:
* https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.expr.html?highlight=expr#pyspark.sql.functions.expr

In [0]:
#creating new column area_class, which should contain value of large if AREA_KM2 > 1000000, medium if AREA_KM2 > 300000, or small otherwise. [using sql syntax]
countries.withColumn(
    'area_class', 
    expr("case when area_km2 > 1000000 then 'large' when area_km2 > 300000 then 'medium' else 'small' end")
    ).limit(5).display()

COUNTRY_ID,NAME,NATIONALITY,COUNTRY_CODE,ISO_ALPHA2,CAPITAL,POPULATION,AREA_KM2,REGION_ID,SUB_REGION_ID,INTERMEDIATE_REGION_ID,ORGANIZATION_REGION_ID,area_class
1,Afghanistan,Afghan,AFG,AF,Kabul,38041754.0,652230,30,30,,30,medium
2,Albania,Albanian,ALB,AL,Tirana,2880917.0,28748,20,70,,20,small
3,Algeria,Algerian,DZA,DZ,Algiers,43053054.0,2381741,50,40,,20,large
4,American Samoa,American Samoan,ASM,AS,Pago Pago,55312.0,199,40,20,,30,small
5,Andorra,Andorran,AND,AD,Andorra la Vella,77142.0,468,20,70,,20,small
