# Read Files, Schema and Datatypes, Renaming Columns, Writing

## Dataframe Creation

A PySpark DataFrame can be created via **pyspark.sql.SparkSession.createDataFrame** typically by passing a list of lists, tuples, dictionaries and pyspark.sql.Rows. 
<br>

**pyspark.sql.SparkSession.createDataFrame** takes the schema argument to specify the schema of the DataFrame. When it is omitted, PySpark infers the corresponding schema by taking a sample from the data.

In [0]:
from datetime import datetime, date
from pyspark.sql import Row

df_row = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df_row

In [0]:
from datetime import datetime, date
df = spark.createDataFrame([
    (1, 2., 'name1', date(2023, 1, 1), datetime(2023, 1, 1, 12, 0)),
    (2, 3., 'name2', date(2023, 2, 1), datetime(2023, 1, 2, 12, 0)),
    (3, 4., 'name3', date(2023, 3, 1), datetime(2023, 1, 3, 12, 0))
], schema='id long, value double, name string, date date, time timestamp')
df.show()

## How to read different file types?
[Reading PySpark](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/io.html#)
<br>


### json
[pyspark.sql.DataFrameReader.json](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.json.html)

In [0]:
df_json = spark.read.json('/mnt/adl2/d3/70_training_dataset_D3/AMN_BIA/Databricks/netflix_top_series/netflix_top.json')

In [0]:
df_json.printSchema()

In [0]:
display(df_json)

### csv
[pyspark.sql.DataFrameReader.csv](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.csv.html)

In [0]:
df_csv= spark.read.csv('/mnt/adl2/d3/70_training_dataset_D3/AMN_BIA/Databricks/netflix_top_series/netflix_top.csv')
df_csv.display()

In [0]:
df_csv2 = spark.read.options(delimiter=",",header=True).csv('/mnt/adl2/d3/70_training_dataset_D3/AMN_BIA/Databricks/netflix_top_series/netflix_top.csv')
df_csv2.display()

In [0]:
df_csv3 = spark.read.option('delimiter',',').options(header=True,inferSchema=True).format('csv').load('/mnt/adl2/d3/70_training_dataset_D3/AMN_BIA/Databricks/netflix_top_series/netflix_top.csv')
df_csv3.display()

In [0]:
# df_csv = spark.read.option('delimiter',',').options(header=True,inferSchema=True).format('csv').load('/mnt/adl2/d3/70_training_dataset_D3/public_datasets/Disney/disney_plus_titles.csv')
# df_csv.display()

In [0]:
# df_csv2 = spark.read.options(delimiter=";",header=True).csv('/databricks-datasets/wine-quality/winequality-red.csv')
# display(wine)

In [0]:
# df_csv3 = spark.read.options(delimiter=";",header=True).csv('/databricks-datasets/wine-quality/winequality-white.csv')
# display(wine2)

### parquet
[pyspark.sql.DataFrameReader.parquet](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.parquet.html)

In [0]:
df_parquet = spark.read.parquet('/mnt/adl2/d3/70_training_dataset_D3/AMN_BIA/Databricks/netflix_top_series/*.parquet')
display(df_parquet)

### excel

In [0]:
# Reading Excel file without the sheet name, passing index
df_excel = (spark
.read
.format("com.crealytics.spark.excel")
.option("header", "true")
.option("inferSchema", "false")
.option("dataAddress", "0!A1")
.load('/mnt/adl2/d3/70_training_dataset_D3/AMN_BIA/Databricks/netflix_top_series/netflix_top.xlsx')
 )
df_excel.display()

In [0]:
df_excel = (spark
.read
.format("com.crealytics.spark.excel")
.option("header", "true")
.option("inferSchema", "true")
.load('/mnt/adl2/d3/70_training_dataset_D3/AMN_BIA/Databricks/netflix_top_series/netflix_top.xlsx')
 )
df_excel.display()

## Schema and Datatypes
[Data Types Docs](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/data_types.html)

In [0]:
from pyspark.sql.types import *

In [0]:
# The current schema is not the same as the other files, in fact the order is different
df_json.printSchema()

In [0]:
# We can use the command describe to get an overview of the file and determinate the data types
display(df_json.describe())

In [0]:
# Lets create a manual schema for the current file

schema_json = StructType(fields=[StructField('rank',IntegerType(), True),
                                 StructField('show_title',StringType(), True),
                                 StructField('category',StringType(), True),
                                 StructField('language',StringType(), True),
                                 StructField('season_title',StringType(), True),
                                 StructField('hours_viewed_first_28_days',IntegerType(), True)
])

In [0]:
# Reading the file with asigning the schema
df_json_schema = spark.read.schema(schema_json).json('/mnt/adl2/d3/70_training_dataset_D3/AMN_BIA/Databricks/netflix_top_series/netflix_top.json')
df_json_schema.display()

In [0]:
# let us confirm that the schema is correct
df_json_schema.printSchema()

## Renaming Columns

In [0]:
# In some cases we need to rename a column to meet the desired output, for those cases we can use withColumnRenamed. if you are going to rename multiple is better to do it in a select statement.
display(df.withColumnRenamed('value','amount'))

## How to write in the CDL? (parquet, csv)
PySpark is the Python library for Apache Spark. <Br>
PySpark provides a user-friendly API for interacting with Spark's distributed computing capabilities.

In [0]:
e_number = 'E055026'

In [0]:
df_parquet.coalesce(1).write.mode('overwrite').option("header", "true").parquet(f'/mnt/adl2/private/50_user/{e_number}/PySpark/Training/netflix_top_series/parquet/')

In [0]:

df_parquet.coalesce(1).write.mode('overwrite').options(header=True, sep = "|").csv(f'/mnt/adl2/private/50_user/{e_number}/PySpark/Training/netflix_top_series/csv/')
