## # Reading files, dataframes and parsing json data

### Note: Creating dataframe with single field
> data = [("Alice",)] or data = [["Alice"]]

If we do `data = [("Alice")]`, it is considered as `data = ["Alice"]`, which is a list of strings, not a list of tuples or lists. To create a DataFrame with a single column, use `data = [("Alice",)]` (list of tuples) or `data = [["Alice"]]` (list of lists).

For creating DataFrames in PySpark, tuples offer better performance and memory efficiency compared to lists due to their immutable nature. Unless you need to modify the data before creating the DataFrame, tuples are generally the preferred choice.

In [0]:
from pyspark.sql import SparkSession, types as T, functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, DoubleType, LongType
from pyspark.sql.window import Window as W
spark=SparkSession.builder.appName('Spark').getOrCreate()

## Create DataFrame & Json field

In [0]:
data=[('Biki',29), ('Raveen', 25)]
fields=StructType([
    StructField('name', StringType(), True),
    StructField('age', IntegerType(), True)
])
df=spark.createDataFrame(data=data, schema=fields)
# create json data column
df=df.withColumn('json_data', F.to_json(F.struct(*df.columns)))
df.show()

## List files from a directory

In [0]:
filepath='/Volumes/workspace/biki/files/'

In [0]:
# Approach 1: To list files using databricks dbutils
files=dbutils.fs.ls(filepath)
print(files)
print("=========================")
f=[]
for k in files:
    f.append({k.name:k.path})
print(f)

In [0]:
# Approach 2: To list files using python library
import os
files = os.listdir(filepath)
print(files)

## Reading different file formats

In [0]:
csvoptions={'header':True, 'inferSchema': True}
csv_schema=StructType([
  StructField('employee_id', IntegerType(), True)
  , StructField('first_name', StringType(), True)
  , StructField('last_name', StringType(), True)
  , StructField('department', StringType(), True)
  , StructField('salary', DoubleType(), True)
  , StructField('join_date', DateType(), True)
])
json_schema=StructType([
  StructField('City', StringType(), True),
  StructField('State', StringType(), True),
  StructField('Zip', StructType([
    StructField('ZipCodeType', StringType(), True),
    StructField('Zipcode', LongType(), True)
  ]))
])

In [0]:
for f in files:
  ext=f.split('.')[-1]
  if(ext=='csv'):
    # csv_df=spark.read.format('csv').option('header', True).option('inferSchema', True).load(f'{filepath}{f}')
    # csv_df=spark.read.format('csv').options(**csvoptions).load(f'{filepath}{f}')
    csv_df=spark.read.format('csv').option('header', True).schema(csv_schema).load(f'{filepath}{f}')
    csv_df.show(truncate=False)
  elif(ext=='json'):
    # json_df=spark.read.format('json').load(f'{filepath}{f}')
    json_df=spark.read.format('json').schema(json_schema).load(f'{filepath}{f}')
    json_df.show(truncate=False)
  elif(ext=='txt'):
    txt_df=spark.read.format('text').load(f'{filepath}{f}')
    txt_df.show(truncate=False)

## Analysing JSON file

In [0]:
json_extract_df=json_df.select(F.col('*'), F.col('Zip.*'))
json_extract_df.show()

In [0]:
spark.sql('select * from {df}', df=json_df).show(truncate=False)

In [0]:
json_df.createOrReplaceTempView('json_view')
spark.sql('select * from json_view').show(truncate=False)

## Reading JSON data from a text file

In [0]:
# Extract fields from parsed json column
txt_df_parsed = txt_df.withColumn('parsed_value', F.from_json('value',json_schema))
txt_df_parsed.select('*', 'parsed_value.Zip.*', 'parsed_value.City', 'parsed_value.State').show()

In [0]:
# Access json value column without parsing
txt_df.select( '*', 
    F.get_json_object('value', '$.City').alias('City'),
    F.get_json_object('value', '$.State').alias('State'),
    F.get_json_object('value', '$.Zip.ZipCodeType').alias('ZipCodeType'),
    F.get_json_object('value', '$.Zip.Zipcode').alias('Zipcode')
)\
.show()