In [1]:
# import os, sys
# from google.colab import drive
# drive.mount('/content/mnt')
# nb_path = '/content/notebooks'
# os.symlink('/content/mnt/My Drive/Colab Notebooks', nb_path)
# sys.path.insert(0, nb_path)  # or append(nb_path)

In [2]:
# !apt-get update
# !apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q http://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
# !tar xf spark-2.3.1-bin-hadoop2.7.tgz
# !pip install -q findspark

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.1-bin-hadoop2.7"

!ls

import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ReadwriteVal").getOrCreate() 
spark

mnt	   part_parquet		      spark-2.3.1-bin-hadoop2.7.tgz
notebooks  sample_data		      spark-warehouse
parquet    spark-2.3.1-bin-hadoop2.7  write_test2.csv


In [4]:
cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
cores

1

# **Reading CSV FILES**

In [5]:
path = '/content/mnt/MyDrive/'
student = spark.read.csv(path+'students.csv', inferSchema = True, header = True)

In [6]:
student.show(5)

+------+--------------+---------------------------+------------+-----------------------+----------+-------------+-------------+
|gender|race/ethnicity|parental level of education|       lunch|test preparation course|math score|reading score|writing score|
+------+--------------+---------------------------+------------+-----------------------+----------+-------------+-------------+
|female|       group B|          bachelor's degree|    standard|                   none|        72|           72|           74|
|female|       group C|               some college|    standard|              completed|        69|           90|           88|
|female|       group B|            master's degree|    standard|                   none|        90|           95|           93|
|  male|       group A|         associate's degree|free/reduced|                   none|        47|           57|           44|
|  male|       group C|               some college|    standard|                   none|        76|     

In [7]:
student.limit(5).toPandas()

Unnamed: 0,gender,race/ethnicity,parental level of education,lunch,test preparation course,math score,reading score,writing score
0,female,group B,bachelor's degree,standard,none,72,72,74
1,female,group C,some college,standard,completed,69,90,88
2,female,group B,master's degree,standard,none,90,95,93
3,male,group A,associate's degree,free/reduced,none,47,57,44
4,male,group C,some college,standard,none,76,78,75


In [8]:
student.printSchema()

root
 |-- gender: string (nullable = true)
 |-- race/ethnicity: string (nullable = true)
 |-- parental level of education: string (nullable = true)
 |-- lunch: string (nullable = true)
 |-- test preparation course: string (nullable = true)
 |-- math score: integer (nullable = true)
 |-- reading score: integer (nullable = true)
 |-- writing score: integer (nullable = true)



In [9]:
student.schema['math score'].dataType

IntegerType

In [10]:
# same as Describe
student.select('math score', 'reading score').summary('count', 'max', 'min').show()

+-------+----------+-------------+
|summary|math score|reading score|
+-------+----------+-------------+
|  count|      1000|         1000|
|    max|       100|          100|
|    min|         0|           17|
+-------+----------+-------------+



# **READING JSON**

In [11]:
#to change dtypes of an existing table columns

from pyspark.sql.types import *

data_schema = [StructField('name', StringType(), True),
               StructField('email', StringType(), True),
               StructField('city', StringType(), True),
               StructField('mac', StringType(), True),
               StructField('timestamp', DateType(), True),
               StructField('creditcard', StringType(), True)]


In [12]:
final_struc = StructType(fields = data_schema)

In [13]:
people = spark.read.json(path+'people.json', schema = final_struc)

In [14]:
people.show(5)

+--------------------+--------------------+---------------+-----------------+----------+-------------------+
|                name|               email|           city|              mac| timestamp|         creditcard|
+--------------------+--------------------+---------------+-----------------+----------+-------------------+
|                null|                null|           null|             null|      null|               null|
|        Keeley Bosco|katlyn@jenkinsmag...|Lake Gladysberg|08:fd:0b:cd:77:f7|2015-04-25|1228-1221-1221-1431|
|         Rubye Jerde|juvenal@johnston....|           null|90:4d:fa:42:63:a2|2015-04-25|1228-1221-1221-1431|
|Miss Darian Breit...|                null|           null|f9:0e:d3:40:cb:e9|2015-04-25|               null|
|    Celine Ankunding|emery_kunze@rogah...|           null|3a:af:c9:0b:5c:08|2015-04-25|1228-1221-1221-1431|
+--------------------+--------------------+---------------+-----------------+----------+-------------------+
only showing top 5 

In [15]:
people.printSchema()

root
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- city: string (nullable = true)
 |-- mac: string (nullable = true)
 |-- timestamp: date (nullable = true)
 |-- creditcard: string (nullable = true)



# **Reading PARQUET FILES**

In [16]:
path = '/content/mnt/MyDrive/'
parquet = spark.read.parquet(path+'users1.parquet')

In [17]:
# parquet.limit(5).toPandas()
parquet.show(5)

+-------------------+---+----------+---------+--------------------+------+--------------+----------------+------------+---------+---------+--------------------+--------+
|  registration_dttm| id|first_name|last_name|               email|gender|    ip_address|              cc|     country|birthdate|   salary|               title|comments|
+-------------------+---+----------+---------+--------------------+------+--------------+----------------+------------+---------+---------+--------------------+--------+
|2016-02-03 07:55:29|  1|    Amanda|   Jordan|    ajordan0@com.com|Female|   1.197.201.2|6759521864920116|   Indonesia| 3/8/1971| 49756.53|    Internal Auditor|   1E+02|
|2016-02-03 17:04:03|  2|    Albert|  Freeman|     afreeman1@is.gd|  Male|218.111.175.34|                |      Canada|1/16/1968|150280.17|       Accountant IV|        |
|2016-02-03 01:09:31|  3|    Evelyn|   Morgan|emorgan2@altervis...|Female|  7.161.136.94|6767119071901597|      Russia| 2/1/1960|144972.51| Structural

In [18]:
parquet.limit(5).toPandas()

Unnamed: 0,registration_dttm,id,first_name,last_name,email,gender,ip_address,cc,country,birthdate,salary,title,comments
0,2016-02-03 07:55:29,1,Amanda,Jordan,ajordan0@com.com,Female,1.197.201.2,6759521864920116.0,Indonesia,3/8/1971,49756.53,Internal Auditor,100.0
1,2016-02-03 17:04:03,2,Albert,Freeman,afreeman1@is.gd,Male,218.111.175.34,,Canada,1/16/1968,150280.17,Accountant IV,
2,2016-02-03 01:09:31,3,Evelyn,Morgan,emorgan2@altervista.org,Female,7.161.136.94,6767119071901597.0,Russia,2/1/1960,144972.51,Structural Engineer,
3,2016-02-03 00:36:21,4,Denise,Riley,driley3@gmpg.org,Female,140.35.109.83,3576031598965625.0,China,4/8/1997,90263.05,Senior Cost Accountant,
4,2016-02-03 05:05:31,5,Carlos,Burns,cburns4@miitbeian.gov.cn,,169.113.235.40,5602256255204850.0,South Africa,,,,


In [19]:
# Reading ALL Parquet Files
partitioned = spark.read.parquet(path+'users*')
partitioned.show(5)

+-------------------+---+----------+---------+--------------------+------+--------------+----------------+------------+---------+---------+--------------------+--------+
|  registration_dttm| id|first_name|last_name|               email|gender|    ip_address|              cc|     country|birthdate|   salary|               title|comments|
+-------------------+---+----------+---------+--------------------+------+--------------+----------------+------------+---------+---------+--------------------+--------+
|2016-02-03 07:55:29|  1|    Amanda|   Jordan|    ajordan0@com.com|Female|   1.197.201.2|6759521864920116|   Indonesia| 3/8/1971| 49756.53|    Internal Auditor|   1E+02|
|2016-02-03 17:04:03|  2|    Albert|  Freeman|     afreeman1@is.gd|  Male|218.111.175.34|                |      Canada|1/16/1968|150280.17|       Accountant IV|        |
|2016-02-03 01:09:31|  3|    Evelyn|   Morgan|emorgan2@altervis...|Female|  7.161.136.94|6767119071901597|      Russia| 2/1/1960|144972.51| Structural

In [20]:
# Reading Parquet Files 1 & 2
users1_2 = spark.read.option('bathpath', path).parquet(path+'users1.parquet', path+'users2.parquet')
users1_2.show(5)

+-------------------+---+----------+---------+--------------------+------+--------------+----------------+------------+---------+---------+--------------------+--------+
|  registration_dttm| id|first_name|last_name|               email|gender|    ip_address|              cc|     country|birthdate|   salary|               title|comments|
+-------------------+---+----------+---------+--------------------+------+--------------+----------------+------------+---------+---------+--------------------+--------+
|2016-02-03 07:55:29|  1|    Amanda|   Jordan|    ajordan0@com.com|Female|   1.197.201.2|6759521864920116|   Indonesia| 3/8/1971| 49756.53|    Internal Auditor|   1E+02|
|2016-02-03 17:04:03|  2|    Albert|  Freeman|     afreeman1@is.gd|  Male|218.111.175.34|                |      Canada|1/16/1968|150280.17|       Accountant IV|        |
|2016-02-03 01:09:31|  3|    Evelyn|   Morgan|emorgan2@altervis...|Female|  7.161.136.94|6767119071901597|      Russia| 2/1/1960|144972.51| Structural

# **Reading S3 FILES**



In [21]:
# bucket = 'my_bucket'
# key1 = "partitioned_data/Table1/year=2016/*"
# key2 = "partitioned_data/Table1/year=2017/*"
# key3 = "partitioned_data/Table1/year=2018/*"


# table1 = spark.read.parquet('s3://' + bucket + '/' + key1, 
#                             's3://' + bucket + '/' + key2)

# table1.show(10)

# **WRITING FILES**


In [22]:
#write_test1.csv

student.write.mode('overwrite').csv('write_test.csv')

In [23]:
#since the partitioned file cant be RENAMED and aggregated here.. so this is another way of writing a File
# write_test2.csv


from py4j.java_gateway import java_import

java_import(spark._jvm, 'org.apache.hadoop.fs.Path')

fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
file = fs.globStatus(spark._jvm.Path('write_test.csv/part*'))[0].getPath().getName()
fs.rename(spark._jvm.Path('write_test.csv/' + file), spark._jvm.Path('write_test2.csv'))
fs.delete(spark._jvm.Path('write_test.csv'), True)


True

In [24]:
# default partitioning

users1_2.write.mode('overwrite').parquet('parquet/')

In [25]:
users1_2.write.mode('overwrite').partitionBy('gender').parquet('part_parquet/')