# Data Reading

## List Files Using DBFS

In [0]:
display(dbutils.fs.ls("/Volumes/workspace/learn/"))

## Load File using spark
It stores the file as a dataframe

### CSV

In [0]:
df1= spark.read.format('csv')\
    .option('inferSchema', True)\
        .option('header', True)\
            .load('/Volumes/workspace/learn/test_volume/BigMart Sales.csv')

### JSON

In [0]:
df2= spark.read.format('json')\
    .option('inferSchema', True)\
        .option('header', True)\
            .load('/Volumes/workspace/learn/test_volume/drivers.json')

## Display spark dataframes

### Using display
It is a databricks only function <br/>
> NOTE: Outputs are interactive and pretty printed

Examples:
* `display(df)`
* `df.display()`

In [0]:
display(df1)

### Using show 
`df.show()`
> NOTE: Output is not pretty printed

In [0]:
df2.show(n=20)

### Displaying ranged values

Pyspark does not support pandas' iLoc because its based on a distributed architechture. It has different workarounds (might cost more)

#### Top N records

Lazy evaulation: limit <br/>
Immediate evaluation (trigger): head

In [0]:
df1.limit(2)
#lazy evaluation: returns a df

In [0]:
df1.head(2)
# immediate evaluation: returns a list of rows

#### Bottom N records

Pyspark only supports immediate action using tail method <br/>
**We have ```select top 10 * from tbl```, and not ```select bottom 10 * from tbl```**

Lazy evaulation is not possible here due to lack of global ordering (since it is distributed) <br />
Workaround? orderBy desc and then limit x (expensive)

> NOTE: both might return different outputs. To achieve correctess, one need to determine grain of the table and then order (in both cases), as tail does not deremine correctness


In [0]:
display(df1.orderBy(df1.Item_Identifier.asc()).tail(2))

In [0]:
# select top 2 * from df1 order by Item_Identifier desc
display(df1.orderBy(df1.Item_Identifier.desc()).limit(2))


#### Using custom range [x,y)

Not supported in pyspark
Pandas library supports it using iloc

Workaround: Use window function with global ordering.
> **Leads to single partition calculation and can cause serious performance degradation**

In [0]:
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

# [x,y) x inclusive y exclusive
x, y = 5, 10

# window function. cannot use partitionBy because it group values and then return same value
w = Window\
    .orderBy(df1.Item_Identifier)   # replace "col" with the column you want to order by

# Add row numbers starting at 1
df_with_rn = df1.withColumn('row_num', row_number().over(w))

# Filter rows in the range [x, y)
result = df_with_rn.filter((df_with_rn.row_num >= x) & (df_with_rn.row_num < y)) \
                   .drop(df_with_rn.row_num)

# display
display(result)

# Schema Definition

### Print Schema

In [0]:
df1.printSchema()

### Define schema manually

#### DDL Schema

In [0]:
# changing Item_Weight from double to string
my_ddl_schema = '''
    Item_Identifier string,
    Item_Weight string,
    Item_Fat_Content string,
    Item_Visibility double,
    Item_Type string,
    Item_MRP double,
    Outlet_Identifier string,
    Outlet_Establishment_Year int,
    Outlet_Size string,
    Outlet_Location_Type string,
    Outlet_Type string,
    Item_Outlet_Sales double
'''

df1_custom_type=spark.read.format('csv')\
  .schema(my_ddl_schema) \
    .option('header', True) \
      .load('/Volumes/workspace/learn/test_volume/BigMart Sales.csv')


display(df1_custom_type)


In [0]:
df1_custom_type.printSchema()

#### StructType()

In [0]:
import pyspark.sql.types as T
import pyspark.sql.functions as F

In [0]:
# Syntax: StructField('col_name', type(), nullable_boolean)
my_struct_schema = T.StructType([
    T.StructField('Item_Identifier', T.StringType(), True),
    T.StructField('Item_Weight', T.StringType(), True),
    T.StructField('Item_Fat_Content', T.StringType(), True),
    T.StructField('Item_Visibility', T.StringType(), True),
    T.StructField('Item_Type', T.StringType(), True),
    T.StructField('Item_MRP', T.StringType(), True),
    T.StructField('Outlet_Identifier', T.StringType(), True),
    T.StructField('Outlet_Establishment_Year', T.StringType(), True),
    T.StructField('Outlet_Size', T.StringType(), True),
    T.StructField('Outlet_Location_Type', T.StringType(), True),
    T.StructField('Outlet_Type', T.StringType(), True),
    T.StructField('Item_Outlet_Sales', T.StringType(), True)
])

In [0]:
df1_custom_type = spark.read.format('csv')\
  .schema(my_struct_schema) \
    .option('header', True) \
      .load('/Volumes/workspace/learn/test_volume/BigMart Sales.csv')

display(df1_custom_type)