In [27]:
from pyspark.sql import SparkSession

from pyspark.sql.types import (
    StructType, StructField,
    IntegerType, LongType, FloatType, DoubleType,
    StringType, DateType, TimestampType, BooleanType,
    MapType, ArrayType
)

import pyspark.sql.functions as f

In [None]:
with SparkSession.builder.appName("pyspark-struktura-danych").getOrCreate() as spark:
    # Definiowanie schematu
    product_schema = StructType(
        [
            StructField("ID", IntegerType(), False),
            StructField("name", StringType(), False),
            StructField("cost_net", DoubleType(), False),
            StructField("vat", IntegerType(), False),
            StructField("cost_gross", DoubleType(), False),
            StructField("category", StringType(), False)
        ]
    )
    
    df = spark.read.format("csv") \
        .options(header=True, delimeter=",", schema=product_schema) \
        .load("./work/data/products.csv")

    df.show(2)
    df.printSchema()

+---+----------+--------+---+----------+--------+
| id|      name|cost_net|vat|cost_gross|category|
+---+----------+--------+---+----------+--------+
|  1|Bindownica| 2003.46|  8|   2163.74|       B|
|  2| Telewizor| 1144.24|  5|   1201.45|       H|
+---+----------+--------+---+----------+--------+
only showing top 2 rows

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- cost_net: string (nullable = true)
 |-- vat: string (nullable = true)
 |-- cost_gross: string (nullable = true)
 |-- category: string (nullable = true)



In [40]:
with SparkSession.builder.appName("pyspark-struktura-danych").getOrCreate() as spark:
    # Definiowanie schematu
    product_schema = StructType(
        [
            StructField("ID", IntegerType(), False),
            StructField("name", StringType(), False),
            StructField("cost_net", DoubleType(), False),
            StructField("vat", IntegerType(), False),
            StructField("cost_gross", DoubleType(), False),
            StructField("category", StringType(), False)
        ]
    )
    
    df = spark.read.format("csv") \
        .options(header=True, delimeter=",", schema=product_schema) \
        .load("./work/data/products.csv")
        
    df_final = spark.createDataFrame([], schema=product_schema)

    # Rzutowanie/Castowaniem typów danych
    df_pre_csv = df.select(
        f.col("id").cast(IntegerType()).alias("ID"),
        f.col("name").cast(StringType()),
        f.col("cost_net").cast(DoubleType()),
        f.col("vat").cast(IntegerType()),
        f.col("cost_gross").cast(DoubleType()),
        f.col("category").cast(StringType())
    )
    
    df_final = df_final.union(df_pre_csv)
    df_final.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- cost_net: double (nullable = true)
 |-- vat: integer (nullable = true)
 |-- cost_gross: double (nullable = true)
 |-- category: string (nullable = true)



In [6]:
with SparkSession.builder.appName("pyspark-files").getOrCreate() as spark:
    df = spark.read.format("json").options(multiLine=True, inferSchema=False).load("./work/data/users.json")
    df.show()
    df.printSchema()

+------------+---------------------+----+--------+--------------------------------+---------+------+
|balans_konta|data_utworzenia_konta|imie|nazwisko|timestamp_ostatniej_aktualizacji|typ_konta|waluta|
+------------+---------------------+----+--------+--------------------------------+---------+------+
|     1500.75|           2021-03-15| Jan|Kowalski|            2023-10-05T14:30:00Z|      VIP|  NULL|
|      2300.5|           2020-07-22|Anna|   Nowak|            2023-09-28T09:45:00Z|     NULL|   PLN|
+------------+---------------------+----+--------+--------------------------------+---------+------+

root
 |-- balans_konta: double (nullable = true)
 |-- data_utworzenia_konta: string (nullable = true)
 |-- imie: string (nullable = true)
 |-- nazwisko: string (nullable = true)
 |-- timestamp_ostatniej_aktualizacji: string (nullable = true)
 |-- typ_konta: string (nullable = true)
 |-- waluta: string (nullable = true)



In [53]:
with SparkSession.builder.appName("pyspark-files").getOrCreate() as spark:
    users_schema = StructType(
        [
            StructField("fname", StringType(), False),
            StructField("lname", StringType(), False),
            StructField("balance", DoubleType(), False),
            StructField("updated_at", TimestampType(), False),
        ]
    )
    
    df = spark.read.format("json").options(multiLine=True).load("./work/data/users.json")

    df_users = df.select(
        f.col("imie").cast(StringType()).alias("fname"),
        f.col("nazwisko").cast(StringType()).alias("lname"),
        f.col("balans_konta").cast(DoubleType()).alias("balance"),
        f.col("timestamp_ostatniej_aktualizacji").cast(TimestampType()).alias("updated_at"),
    ).show()

+-----+--------+-------+-------------------+
|fname|   lname|balance|         updated_at|
+-----+--------+-------+-------------------+
|  Jan|Kowalski|1500.75|2023-10-05 14:30:00|
| Anna|   Nowak| 2300.5|2023-09-28 09:45:00|
+-----+--------+-------+-------------------+

