<a href="https://colab.research.google.com/github/PawelJakubczyk/Chess/blob/master/pyspark_crud_read.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Install Liblary

In [4]:
%pip install pyspark

Collecting pyspark
  Using cached pyspark-3.5.0.tar.gz (316.9 MB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=9ddb01b53e1538ef5eee46b23f41c856402182b8bc6bf345a0c2a12374b1dc04
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


# Creat Pyspark DF

## Import Library

In [5]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType

## Start Spark Session

In [6]:
spark = SparkSession.builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

## Define Schema

In [None]:
custom_schema = StructType(
    [
        StructField("ID", StringType()),
        StructField("Subsector", StringType()),
        StructField("Category", StringType()),
        StructField("Brand", StringType()),
        StructField("Material_ID", IntegerType()),
        StructField("Description", StringType()),
        StructField("Plant_Code", StringType()),
        StructField("Plant_Name", StringType()),
        StructField("Validity_Date_From", DateType()),
        StructField("Validity_Date_To", DateType()),
        StructField("Modification_Date", DateType()),
        StructField("Status", StringType()),
    ]
)

### Create Empty DF

In [None]:
emptyRDD = spark.sparkContext.emptyRDD()
df_empty = spark.createDataFrame(emptyRDD, custom_schema)

df_empty = spark.createDataFrame([], custom_schema)

### Create DF from variables

In [None]:

# list  of college data with two lists
data = [["node.js", "dbms", "integration"],
        ["jsp", "SQL", "trigonometry"],
        ["php", "oracle", "statistics"],
        [".net", "db2", "Machine Learning"]]

# giving column names of dataframe
columns = ["Web Technologies", "Data bases", "Maths"]

# creating a dataframe
dataframe = spark.createDataFrame(data, columns, schema=custom_schema)

### Craete DF from file

#### xlsx

In [None]:
xlsx_file_path = "path/to/your/xlsx/file.xlsx"
sheet_name = "name_of_your_excel_sheet"

df_xlsx_pd = pd.read_excel(xlsx_file_path, sheet_name=sheet_name, inferSchema=True, schema=custom_schema)
df_xlsx = spark.createDataFrame(df_xlsx_pd)

df_xlsx = (spark.read.format("com.crealytics.spark.excel")
    .option("useHeader", "true")
    .option("inferSchema", "true")
    .option("dataAddress", f"'{sheet_name}'!")
    .schema(custom_schema)
    .load(xlsx_file_path))

#### csv

In [None]:
csv_file_path = "path/to/your/csv/file.csv"
df_csv = spark.read.csv(csv_file_path, schema=custom_schema, header=True, inferSchema=True)

df_csv = (spark.read.format("csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .schema(custom_schema)
    .load(csv_file_path))

#### json

In [None]:
json_file_path = "path/to/your/json/file.json"

df_json = spark.read.json(json_file_path, schema=custom_schema, header=True, inferSchema=True)

df_json = (spark.read.format("json")
    .option("inferSchema", "true")
    .schema(custom_schema)
    .load(json_file_path))

#### parquet

In [None]:
parquet_file_path = "path/to/your/json/file.json"

df_hist_del = spark.read.parquet(parquet_file_path)
df_read_history = spark.read.format("parquet").load(parquet_file_path)

#### delta

In [None]:
delta_files_path = "path/to/your/delta/folder"

df_delta = spark.read.format("delta").load(delta_files_path)
df_delta = spark.read.delta.load(delta_files_path)

#### all format

In [13]:
path = "path/to/your/file_or_folder"

def create_df(df_format:str, path:str, sheet_name:str = "", custom_schema = ""):
    formats_list = ["csv", "xlsx", "json", "parquet", "delta"]
    df_options = ""

    if df_format not in formats_list:
        raise ValueError(f"the function only supports selected formats: {formats_list}")

    if df_format not in ["parquet", "delta"]:
        if not custom_schema:
            raise ValueError("the selected df_format does not store a schema, you must provide the schema argument")
        df_options += '.option("useHeader", "true").option("inferSchema", "true").schema(custom_schema)'

        if df_format == "xlsx":
            df_format = "com.crealytics.spark.excel"
            if sheet_name == '':
                raise ValueError("for the xlsx df_format, the sheets variable is required")
            ptions += f""".option("dataAddress", f"'{sheet_name}'!")"""

    df = eval(f'spark.read.format({df_format}).{df_options}.load({path})')
    # df = spark.read.format(df_format).options(eval(df_options)).load(path)
    return df