In [1]:
from pyspark.sql import *
from pyspark import SparkConf
from helpers.logger import Log4J
import os
import sys
from configparser import ConfigParser

In [2]:
config = ConfigParser()
config.read("config.ini")

spark_config = config["SPARK_APP_CONFIG"]
dataset_path = config["file_path"]["data_set_path"]

In [3]:
conf = SparkConf()
for key in spark_config:
    print(spark_config[key].strip())
    conf.set(key, spark_config[key].strip())
conf.set("spark.driver.extraJavaOptions", "-Dlog4j.configuration=file:log4j.properties -Dspark.yarn.app.container.log.dir=app-logs -Dlogfile.name=ashish-spark")

Ashish_private_org
local[3]
-Dlog4j.configuration=file:log4j.properties
Ashish


<pyspark.conf.SparkConf at 0x7516fc13bb00>

In [4]:
spark = SparkSession.builder \
            .config(conf=conf) \
            .getOrCreate()

25/02/21 08:15:52 WARN Utils: Your hostname, lenovo resolves to a loopback address: 127.0.1.1; using 192.168.29.125 instead (on interface wlp3s0)
25/02/21 08:15:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
25/02/21 08:15:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
logger = Log4J(
    spark_session=spark
    )

logger.info("Started")

25/02/21 08:15:58 INFO Ashish_private_org: Started


To create a simple Pyspark dataframe, we just need the file_path and read that with `.read` attribute, which gives us ability to read different file format.

In [6]:
df = spark.read.csv(dataset_path)

                                                                                

Now, to print the DataFrame, we need to call function .show(), this is a action.  
There are two ways to show the DataFrame
- df.show()
- display(df)

In [7]:
df.show()

+-------+-------+--------------------+--------------------+--------------------+--------------------+------------------+------------+------+---------+--------------------+--------------------+
|    _c0|    _c1|                 _c2|                 _c3|                 _c4|                 _c5|               _c6|         _c7|   _c8|      _c9|                _c10|                _c11|
+-------+-------+--------------------+--------------------+--------------------+--------------------+------------------+------------+------+---------+--------------------+--------------------+
|show_id|   type|               title|            director|                cast|             country|        date_added|release_year|rating| duration|           listed_in|         description|
|     s1|  Movie|Dick Johnson Is Dead|     Kirsten Johnson|                NULL|       United States|September 25, 2021|        2020| PG-13|   90 min|       Documentaries|As her father nea...|
|     s2|TV Show|       Blood & Wat

**But, above didn't give us the header(column name) of the CSV file**  
So, to get headers, we need to set `header` to `true` in `.option`  
But your CSV file must have first row as a column name too.

we can refer here: https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.read_csv.html?highlight=headers  
to know more CSV methods, lets what does this `.option` takes.

In [8]:
df_with_column = spark.read \
                    .option("header", "true") \
                    .csv(dataset_path)

df_with_column.show()

+-------+-------+--------------------+--------------------+--------------------+--------------------+------------------+------------+------+---------+--------------------+--------------------+
|show_id|   type|               title|            director|                cast|             country|        date_added|release_year|rating| duration|           listed_in|         description|
+-------+-------+--------------------+--------------------+--------------------+--------------------+------------------+------------+------+---------+--------------------+--------------------+
|     s1|  Movie|Dick Johnson Is Dead|     Kirsten Johnson|                NULL|       United States|September 25, 2021|        2020| PG-13|   90 min|       Documentaries|As her father nea...|
|     s2|TV Show|       Blood & Water|                NULL|Ama Qamata, Khosi...|        South Africa|September 24, 2021|        2021| TV-MA|2 Seasons|International TV ...|After crossing pa...|
|     s3|TV Show|           Ganglan

Let's use `display` to check the dataset in SQL like

In [12]:
display(df_with_column)  # this mostly work in notebook env like Databricks or Jupyter

DataFrame[show_id: string, type: string, title: string, director: string, cast: string, country: string, date_added: string, release_year: string, rating: string, duration: string, listed_in: string, description: string]

We also want our DataFrame should **infer the schema** from data  
- This option allow our dataframe to read the portion of the file and make a guess of datatypes of each column.
- Keep in mind, this option just work in few cases.
- In many cases, it fails to infer the schema type correctly

In [13]:
df_infer_schema = spark.read \
                    .option("header", "true") \
                    .option("inferSchema", "true") \
                    .csv(dataset_path)

df_infer_schema.show()

                                                                                

+-------+-------+--------------------+--------------------+--------------------+--------------------+------------------+------------+------+---------+--------------------+--------------------+
|show_id|   type|               title|            director|                cast|             country|        date_added|release_year|rating| duration|           listed_in|         description|
+-------+-------+--------------------+--------------------+--------------------+--------------------+------------------+------------+------+---------+--------------------+--------------------+
|     s1|  Movie|Dick Johnson Is Dead|     Kirsten Johnson|                NULL|       United States|September 25, 2021|        2020| PG-13|   90 min|       Documentaries|As her father nea...|
|     s2|TV Show|       Blood & Water|                NULL|Ama Qamata, Khosi...|        South Africa|September 24, 2021|        2021| TV-MA|2 Seasons|International TV ...|After crossing pa...|
|     s3|TV Show|           Ganglan

In [14]:
display(df_infer_schema)

DataFrame[show_id: string, type: string, title: string, director: string, cast: string, country: string, date_added: string, release_year: string, rating: string, duration: string, listed_in: string, description: string]