## 01-pyspark-read-csv.py

In [0]:
# 01-pyspark-read-csv.py
# import findspark    # Required for Jupyter Netbook or Jupyter Lab
# findspark.init()    # Required for Jupyter Netbook or Jupyter Lab
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType 
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col,array_contains

spark = SparkSession.builder.appName('PySparkExamples').getOrCreate()

In [0]:
# ------------------------------------------- In Jupyter Lab / NoteBook Platform
print ('Processing DataFrame-1...')
df1 = spark.read.csv("E:/datafile/iris.csv")  # headers will not be detected
print("DataFrame columns are:", df1.columns, "with column count:", len(df1.columns), "and with row count:", df1.count())
df1.printSchema()
df1.show()

print ('Processing DataFrame-2...')
df2 = spark.read.option("header", True) \
    .csv("E:/datafile/iris.csv")
df2.printSchema()
df2.show()

print ('Processing DataFrame-3...')
df3 = spark.read.options(header = 'True', delimiter = ',') \
     .csv("E:/datafile/iris.csv")
df3.printSchema()
df3.show()

In [0]:
# ------------------------------------------- In Google CoLab Platform
!pip3 install pyspark
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.appName("PySparkExamples").getOrCreate()
from google.colab import drive
drive.mount('/content/gdrive')
# upload iris.csv file on the Google Drive Colab Notebooks folder 
# Read the CSV file into a DataFrame
df4 = spark.read.csv("/content/gdrive/My Drive/Colab Notebooks/iris.csv", 
                    header = True, inferSchema = True)
# Display the contents of the DataFrame
df4.printSchema()
df4.show()

In [0]:
# ------------------------------------------- In Data Bricks Platform
print ('Processing DataFrame-5...')
df5 = spark.read.options(header = 'True', delimiter = ',') \
    .csv("dbfs:/FileStore/tables/iris.csv")  # Spark API Format
print("DataFrame columns are:", df5.columns, "with column count:", len(df5.columns), "and with row count:", df5.count())
df5.printSchema()
df5.show()  # by default first 20 rows will be displayed

iris_schema = StructType() \
    .add("sepal_length", DoubleType(), True) \
    .add("sepal_width", DoubleType(), True) \
    .add("petal_length", DoubleType(), True) \
    .add("petal_width", DoubleType(), True) \
    .add("species", StringType(), True)

print ('Processing DataFrame-6...')
# dataframe with schema - 1
df6 = spark.read.options(header = 'True', delimiter = ',') \
    .schema(iris_schema) \
    .csv("dbfs:/FileStore/tables/iris.csv")
df6.printSchema()
df6.show(5)

print ('Processing DataFrame-7...')
# dataframe with schema - 2
df7 = spark.read.format("csv") \
    .option("header", True) \
    .schema(iris_schema) \
    .load("dbfs:/FileStore/tables/iris.csv")
df7.printSchema()
df7.show(5)

Processing DataFrame-5...
DataFrame columns are: ['sepal_length', 'sepal_width', 'petal_length', 'petal_width', 'species'] with column count: 5 and with row count: 150
root
 |-- sepal_length: string (nullable = true)
 |-- sepal_width: string (nullable = true)
 |-- petal_length: string (nullable = true)
 |-- petal_width: string (nullable = true)
 |-- species: string (nullable = true)

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|          3|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|           5|        3.6|         1.4|        0.2| setosa|
|         5.4|        3.9|         1.7|        0.4| setosa|
|         4.6|        3.4|         1.4|        0.3| setosa|
|           5

In [0]:
print ('Creating part files from DataFrame-6...')
df7.write.option("header", True).mode("overwrite") \
    .csv("dbfs:/FileStore/tables/iris_write")

Creating part files from DataFrame-6...


In [0]:
# Renaming the part file name with a shorter name
# Get the "Spark API format" link of the part file from the Data tab
%fs mv dbfs:/FileStore/tables/iris_write/part-00000-ti...82-1-c000.csv dbfs:/FileStore/tables/iris_write/partfile.csv

In [0]:
# %fs mv dbfs:/FileStore/tables/iris_write/part-00000-tid-6007287614469198718-3573cc64-eeab-4ef0-9e68-db07aa4e830e-6204-1-c000.csv dbfs:/FileStore/tables/iris_write/partfile.csv

In [0]:
# reading the part file
print ('Processing DataFrame-8 from part file created earlier...')
df8 = spark.read.options(header = 'True', delimiter = ',') \
    .csv("dbfs:/FileStore/tables/iris_write/partfile.csv")
df8.printSchema()
df8.show()

Processing DataFrame-8 from part file created earlier...
root
 |-- sepal_length: string (nullable = true)
 |-- sepal_width: string (nullable = true)
 |-- petal_length: string (nullable = true)
 |-- petal_width: string (nullable = true)
 |-- species: string (nullable = true)

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
|         5.4|        3.9|         1.7|        0.4| setosa|
|         4.6|        3.4|         1.4|        0.3| setosa|
|         5.0|        3.4|         1.5|        0.2| setosa|
|         4.4|        2.9|         1.4|        0.2| setosa|
|   

### Data Bricks File System (DBFS) commands:
**To list filesystem**<br>
%fs ls<br>
%fs ls dbfs:/FileStore/tables<br>

**To rename a file**<br>
%fs mv dbfs:/FileStore/tables/old_file_name dbfs:/FileStore/tables/new_file_name

**To delete a file from DBFS**<br>
dbutils.fs.rm("dbfs:/FileStore/tables/your_file_name")<br>
%fs rm dbfs:/FileStore/tables/your_file_name

**To delete a folder from DBFS**<br>
%fs rm -r dbfs:/FileStore/tables/your_folder_name

**to copy a file in DBFS**<br>
%fs cp dbfs:/FileStore/tables/old_file_name dbfs:/FileStore/tables/new_file_name

**To move a file to another folder in DBFS**<br>
%fs mv dbfs:/FileStore/tables/old_file_name dbfs:/FileStore/tables/target_folder

**To create a folder in DBFS**<br>
%fs mkdirs dbfs:/FileStore/tables/mydir