# Writing Data

#### Resources
* https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/io.html
* https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option

In [0]:
# Defining the schema and importing Types 
from pyspark.sql.types import IntegerType, StringType, DoubleType, StructField, StructType
countries_schema = StructType([
                    StructField("COUNTRY_ID", IntegerType(), False),
                    StructField("NAME", StringType(), False),
                    StructField("NATIONALITY", StringType(), False),
                    StructField("COUNTRY_CODE", StringType(), False),
                    StructField("ISO_ALPHA2", StringType(), False),
                    StructField("CAPITAL", StringType(), False),
                    StructField("POPULATION", DoubleType(), False),
                    StructField("AREA_KM2", IntegerType(), False),
                    StructField("REGION_ID", IntegerType(), True),
                    StructField("SUB_REGION_ID", IntegerType(), True),
                    StructField("INTERMEDIATE_REGION_ID", IntegerType(), True),
                    StructField("ORGANIZATION_REGION_ID", IntegerType(), True)
                    ]
                    )

In [0]:
# Reading in the countries.csv file
countries_df = spark.read.csv('/FileStore/countries.csv', header=True, schema=countries_schema)

In [0]:
display(countries_df)

COUNTRY_ID,NAME,NATIONALITY,COUNTRY_CODE,ISO_ALPHA2,CAPITAL,POPULATION,AREA_KM2,REGION_ID,SUB_REGION_ID,INTERMEDIATE_REGION_ID,ORGANIZATION_REGION_ID
1,Afghanistan,Afghan,AFG,AF,Kabul,38041754.0,652230.0,30,30.0,,30
2,Albania,Albanian,ALB,AL,Tirana,2880917.0,28748.0,20,70.0,,20
3,Algeria,Algerian,DZA,DZ,Algiers,43053054.0,2381741.0,50,40.0,,20
4,American Samoa,American Samoan,ASM,AS,Pago Pago,55312.0,199.0,40,20.0,,30
5,Andorra,Andorran,AND,AD,Andorra la Vella,77142.0,468.0,20,70.0,,20
6,Angola,Angolan,AGO,AO,Luanda,31825295.0,1246700.0,50,160.0,80.0,20
7,Anguilla,Anguillan,AIA,AI,The Valley,14869.0,91.0,10,10.0,60.0,40
8,Antarctica,Antarctic,ATA,AQ,McMurdo Station,1106.0,14200000.0,40,,,30
9,Antigua and Barbuda,Antiguan or Barbudan,ATG,AG,St. John's,97118.0,442.0,10,10.0,60.0,40
10,Argentina,Argentine,ARG,AR,Buenos Aires,44780677.0,2780400.0,10,10.0,40.0,40


In [0]:
# Writing the countries_df Dataframe to the DBFS as a csv
countries_df.write.csv('/FileStore/countries_out', header=True)

In [0]:
# Reading in the countries_out file
df = spark.read.csv('/FileStore/countries_out', header=True, schema=countries_schema)

In [0]:
display(df)

COUNTRY_ID,NAME,NATIONALITY,COUNTRY_CODE,ISO_ALPHA2,CAPITAL,POPULATION,AREA_KM2,REGION_ID,SUB_REGION_ID,INTERMEDIATE_REGION_ID,ORGANIZATION_REGION_ID
1,Afghanistan,Afghan,AFG,AF,Kabul,38041754.0,652230.0,30,30.0,,30
2,Albania,Albanian,ALB,AL,Tirana,2880917.0,28748.0,20,70.0,,20
3,Algeria,Algerian,DZA,DZ,Algiers,43053054.0,2381741.0,50,40.0,,20
4,American Samoa,American Samoan,ASM,AS,Pago Pago,55312.0,199.0,40,20.0,,30
5,Andorra,Andorran,AND,AD,Andorra la Vella,77142.0,468.0,20,70.0,,20
6,Angola,Angolan,AGO,AO,Luanda,31825295.0,1246700.0,50,160.0,80.0,20
7,Anguilla,Anguillan,AIA,AI,The Valley,14869.0,91.0,10,10.0,60.0,40
8,Antarctica,Antarctic,ATA,AQ,McMurdo Station,1106.0,14200000.0,40,,,30
9,Antigua and Barbuda,Antiguan or Barbudan,ATG,AG,St. John's,97118.0,442.0,10,10.0,60.0,40
10,Argentina,Argentine,ARG,AR,Buenos Aires,44780677.0,2780400.0,10,10.0,40.0,40


In [0]:
# Specifying the mode as overwrite allows you to overwrite an existing file
df.write.options(header=True).mode('overwrite').csv('/FileStore/output/countries_out')

In [0]:
display(df)

COUNTRY_ID,NAME,NATIONALITY,COUNTRY_CODE,ISO_ALPHA2,CAPITAL,POPULATION,AREA_KM2,REGION_ID,SUB_REGION_ID,INTERMEDIATE_REGION_ID,ORGANIZATION_REGION_ID
1,Afghanistan,Afghan,AFG,AF,Kabul,38041754.0,652230.0,30,30.0,,30
2,Albania,Albanian,ALB,AL,Tirana,2880917.0,28748.0,20,70.0,,20
3,Algeria,Algerian,DZA,DZ,Algiers,43053054.0,2381741.0,50,40.0,,20
4,American Samoa,American Samoan,ASM,AS,Pago Pago,55312.0,199.0,40,20.0,,30
5,Andorra,Andorran,AND,AD,Andorra la Vella,77142.0,468.0,20,70.0,,20
6,Angola,Angolan,AGO,AO,Luanda,31825295.0,1246700.0,50,160.0,80.0,20
7,Anguilla,Anguillan,AIA,AI,The Valley,14869.0,91.0,10,10.0,60.0,40
8,Antarctica,Antarctic,ATA,AQ,McMurdo Station,1106.0,14200000.0,40,,,30
9,Antigua and Barbuda,Antiguan or Barbudan,ATG,AG,St. John's,97118.0,442.0,10,10.0,60.0,40
10,Argentina,Argentine,ARG,AR,Buenos Aires,44780677.0,2780400.0,10,10.0,40.0,40


In [0]:
# Partitioning the Dataframe file when writing it as a csv
df.write.options(header=True).mode('overwrite').partitionBy('REGION_ID','SUB_REGION_ID').csv('/FileStore/countries_out')

In [0]:
# Reading an individual partition, you will need to pass in your specific file path
df2 = spark.read.csv('/FileStore/countries_out/REGION_ID=10/', header=True)

In [0]:
display(df2)

COUNTRY_ID,NAME,NATIONALITY,COUNTRY_CODE,ISO_ALPHA2,CAPITAL,POPULATION,AREA_KM2,INTERMEDIATE_REGION_ID,ORGANIZATION_REGION_ID,SUB_REGION_ID
7,Anguilla,Anguillan,AIA,AI,The Valley,14869.0,91,60.0,40,10
9,Antigua and Barbuda,Antiguan or Barbudan,ATG,AG,St. John's,97118.0,442,60.0,40,10
10,Argentina,Argentine,ARG,AR,Buenos Aires,44780677.0,2780400,40.0,40,10
12,Aruba,Aruban,ABW,AW,Oranjestad,106314.0,180,60.0,40,10
16,Bahamas,Bahamian,BHS,BS,Nassau,389482.0,13943,60.0,40,10
19,Barbados,Barbadian,BRB,BB,Bridgetown,287025.0,430,60.0,40,10
22,Belize,Belizean,BLZ,BZ,Belmopan,390353.0,22966,20.0,40,10
26,Bolivia (Plurinational State of),Bolivian,BOL,BO,Sucre,11513100.0,1098581,40.0,40,10
27,"Bonaire, Sint Eustatius and Saba",Sint Eustatius and Saba,BES,BQ,Kralendijk,25711.0,328,,40,10
30,Bouvet Island,Bouvet Island,BVT,BV,,0.0,49,40.0,40,10


In [0]:
df2.write.csv(path="/Filestore/countries_out/REGION_ID=10/", mode="overwrite", header=True)

In [0]:
# writing a csv file
df.write.mode('overwrite').partitionBy('REGION_ID','SUB_REGION_ID').csv('/FileStore/tables/countries_out_2', header=True)