In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, from_json, date_format, to_timestamp
from pyspark.sql.types import *
from pyspark.sql import functions as F

spark.conf.set("spark.sql.shuffle.partitions", 14)

spark = SparkSession. \
    builder. \
    appName("Data Sources"). \
    master("local"). \
    config("spark.jars", "../jars/postgresql-42.2.19.jar"). \
    getOrCreate()


# Read/Write DataFrame with file system, HDFS, S3, FTP

In [None]:
# config("spark.python.worker.memory", "8g"). \
#     config("spark.driver.memory", "8g"). \
#     config("spark.executor.memory", "8g"). \
#  \


In [6]:
cars_df = spark.read. \
    format("json"). \
    option("inferSchema", "true"). \
    option("mode", "failFast"). \
    option("path", "data/cars"). \
    load()

cars_df.show()



+------------+---------+------------+----------+----------------+--------------------+------+-------------+----------+
|Acceleration|Cylinders|Displacement|Horsepower|Miles_per_Gallon|                Name|Origin|Weight_in_lbs|      Year|
+------------+---------+------------+----------+----------------+--------------------+------+-------------+----------+
|        12.0|        8|       307.0|       130|            18.0|chevrolet chevell...|   USA|         3504|      null|
|        12.0|        8|       307.0|       130|            18.0|chevrolet chevell...|   USA|         3504|1970-01-01|
|        11.5|        8|       350.0|       165|            15.0|   buick skylark 320|   USA|         3693|1970-01-01|
|        11.0|        8|       318.0|       150|            18.0|  plymouth satellite|   USA|         3436|1970-01-01|
|        12.0|        8|       304.0|       150|            16.0|       amc rebel sst|   USA|         3433|1970-01-01|
|        10.5|        8|       302.0|       140|

In [None]:
# HDFS
# option("path", "hdfs://nn1home:8020/sources/cars"). \

# FTP
# option("path", "ftp://user:pwd/192.168.1.5/sources/cars"). \

# S3
# option("path", s3://bucket-name/sources/cars)


In [7]:
cars_df_v2 = spark.read. \
    format("json"). \
    options(mode="failFast", path="data/cars", inferSchema="true"). \
    load()

cars_df_v2.show()

         # /sources/cars
# 10.1.1.1 node1 -> block1     S3 NETWORK                             -> partition1 -> task1
# 10.1.1.2 node2 -> block2 -> Spark Driver -> Name Node -> ip adress -> partition2 -> task2
# 10.1.1.3 node3 -> block2                                           -> parttion3 -> task3


+------------+---------+------------+----------+----------------+--------------------+------+-------------+----------+
|Acceleration|Cylinders|Displacement|Horsepower|Miles_per_Gallon|                Name|Origin|Weight_in_lbs|      Year|
+------------+---------+------------+----------+----------------+--------------------+------+-------------+----------+
|        12.0|        8|       307.0|       130|            18.0|chevrolet chevell...|   USA|         3504|      null|
|        12.0|        8|       307.0|       130|            18.0|chevrolet chevell...|   USA|         3504|1970-01-01|
|        11.5|        8|       350.0|       165|            15.0|   buick skylark 320|   USA|         3693|1970-01-01|
|        11.0|        8|       318.0|       150|            18.0|  plymouth satellite|   USA|         3436|1970-01-01|
|        12.0|        8|       304.0|       150|            16.0|       amc rebel sst|   USA|         3433|1970-01-01|
|        10.5|        8|       302.0|       140|

In [8]:
cars_df.\
    repartition(3). \
    write. \
    mode("overwrite"). \
    option("compression", "snappy"). \
    parquet("data/parquet"). \
    save()

 #    partitionBy("Year"). \
 # \

# A lot of small files problem
# repartition(3) => round robin
# repartition(col("field")) => hash partitioning
# repartition(3) + partitionBy("Year") NOT GOOD
# repartition(col("field")) + partitionBy("Year") GOOD

# Parquet = binary data, high compression, low CPU usage, very fast
# also contains the schema
# the default data format in Spark


AttributeError: 'NoneType' object has no attribute 'save'

In [2]:


# stocks_df.write.save("data/stocks_parquet")

# each row is a value in a DF with a SINGLE column ("value")
text_df = spark.read.text("data/lipsum")
text_df.show()

# !!!!!!!!!!!!! DIFFERENCE between saveAsTable() and write

AnalysisException: Path does not exist: file:/home/jovyan/notebooks/data/lipsum

# data_formats_json_avro_parquet

In [None]:
state_names_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("data/statenames")

state_names_df.show()
state_names_df.printSchema()

state_names_df \
    .coalesce(1) \
    .write \
    .mode("overwrite") \
    .parquet("data/target/statenames_parquet")


# jdbc_postgres_oracle

In [None]:
DBPARAMS = {
    "user": user,
    "password": password,
    "driver": driver
}

employees = "public.employees"
employees_pruned = """(select e.first_name, e.last_name, e.hire_date from public.employees e where e.gender = 'F') as new_emp"""

# 10101        99999
# 10102        99998
# 10103        10103

# df = spark.\
#     read.\
#     jdbc(url=url, table=employees, properties=DBPARAMS)

# print("GET NUM PARTITIONS")
# print(df.rdd.getNumPartitions())

# df.printSchema()
# df.agg(F.max(F.col("emp_no")), F.min(F.col("emp_no"))).show()


df = spark.read.jdbc(url=url, table="public.employees", properties=DBPARAMS,
                     column="emp_no", lowerBound = 10010, upperBound = 499990, numPartitions = 10)

# lowerBound = 10010
# upperBound = 499990
#
# ex1 => part1 => select * from public.employees e where e.emp_num > x and e.emp_num
# ex2 => part2 =

pred = ["gender = 'F'", "gender = 'M'", "gender = 'M'"]
# be carefully with borders
pred2 = ["emp_no > 10010 and emp_no <= 50000", "emp_no >= 50000 and emp_no <= 100000"]

df = spark.read.jdbc(url=url, table="public.employees", properties=DBPARAMS, predicates =pred)
df.show()

# lowerBound = 10010,
# upperBound = 499990,
# numPartitions = 20,

# Killer joins => optimised UDF

# print("GET NUM PARTITIONS")
# print(df.rdd.getNumPartitions())
#
# df.show()


In [None]:
employees_df = spark.read. \
    format("jdbc"). \
    option("driver", driver). \
    option("url", url). \
    option("user", user). \
    option("password", password). \
    option("dbtable", "public.employees"). \
    load()


In [None]:
# department_df = spark.read (dept_no, dept_name) // 200
#
# employees_df. \
#     groupBy("dept_no"). \
#     count(). \
#     join(department_df, col("dept_no") = col("dept_no"),  "inner")

# Solution1 UDF
#

print("GET NUM PARTITIONS")
print(employees_df.rdd.getNumPartitions())


employees_df.show()

employees_df.write.bucketBy(10, "emp_no").sortBy("emp_no").mode("overwrite").saveAsTable("employee_bucketed")
# employees_df.write.mode("overwrite").save() Parquet



# queue_kafka

In [None]:
schema = StructType([
    StructField("timestamp", StringType()),
    StructField("page", StringType())
])


# source_batch_df = spark.read\
#     .format("kafka")\
#     .option("kafka.bootstrap.servers", "localhost:29092")\
#     .option("subscribe", "input")\
#     .load()
#
# print(source_batch_df.isStreaming)
#
# source_batch_df.show()


source_streaming_df = spark.readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "localhost:29092")\
    .option("subscribe", "input")\
    .load()

print(source_streaming_df.isStreaming)

typed_source_streaming_df = source_streaming_df.\
    select(expr("cast(value as string) as actualValue")).\
    select(from_json(col("actualValue"), schema).alias("page")).\
    selectExpr("page.timestamp as timestamp", "page.page as page").\
    select(date_format(to_timestamp(col("timestamp"), "dd-MM-yyyy HH:mm:ss:SSS"), "HH:mm:ss:SSS").alias("time"),col("page")
  )

source_streaming_df.\
    writeStream.\
    outputMode("append").\
    foreachBatch(lambda b, l: b.show).\
    trigger(processingTime='3 seconds').\
    start().\
    awaitTermination()


Exercise: read the movies DF, then write it as
- tab-separated "CSV"
- parquet
- table "public.movies" in the Postgres DB

Exercise #2: find a way to read the people-1m dataFrame. Then write it as JSON.