### Spark Session 생성

In [None]:
from pyspark.sql import SparkSession
spark = (SparkSession
      .builder
      .appName("Chatper4")
      .getOrCreate())

### 기본 쿼리 예제 88p

In [None]:
# 데이터 경로
csv_file = "PySpark/data/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"

# 읽고 임시뷰를 생성
# 스키마 추론(더 큰 파일일 경우에는 스키마를 지정해주자)
df = (spark.read.format("csv")
      .option("inferSchema", "true")
      .option("header", "true")
      .load(csv_file))

df.createOrReplaceTempView("us_delay_flights_tbl")

### 90p

In [None]:
spark.sql("""SELECT distance, origin, destination
    FROM us_delay_flights_tbl WHERE distance > 1000
    ORDER BY distance DESC""").show(10)

In [None]:
spark.sql("""SELECT date, delay, origin, destination
    FROM us_delay_flights_tbl
    WHERE delay > 120 AND ORIGIN = 'SFO' AND DESTINATION = 'ORD'
    ORDER by delay DESC""").show(10)

### 91p

In [None]:
spark.sql("""SELECT delay, origin, destination,
              CASE
                  WHEN delay > 360 THEN 'Very Long Delays'
                  WHEN delay > 120 AND delay < 360 THEN 'Long Delays'
                  WHEN delay > 60 AND delay < 120 THEN 'Short Delays'
                  WHEN delay > 0 and delay < 60  THEN  'Tolerable Delays'
                  WHEN delay = 0 THEN 'No Delays'
                  ELSE 'Early'
               END AS Flight_Delays
               FROM us_delay_flights_tbl
               ORDER BY origin, delay DESC""").show(10)

### 92p

In [None]:
# In Python
from pyspark.sql.functions import col, desc 
(df.select("distance", "origin", "destination")
      .where(col("distance") > 1000)
      .orderBy(desc("distance"))).show(10)
# Or
(df.select("distance", "origin", "destination")
    .where("distance > 1000")
    .orderBy("distance", ascending=False).show(10))

### 93p

In [None]:
#// In Scala/Python
spark.sql("CREATE DATABASE learn_spark_db")
spark.sql("USE learn_spark_db")

### 94p 기존 코드 오류로 인한 아래 코드로 수정

In [None]:
# # // In Scala/Python 기존 코드
# spark.sql("CREATE TABLE managed_us_delay_flights_tbl (date STRING, delay INT, distance INT, origin STRING, destination STRING)")

In [None]:
# 오류로 인한 수정한 코드
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Chatper4").config("spark.sql.legacy.createHiveTableByDefault", "false").getOrCreate()

#// In Scala/Python
spark.sql("CREATE TABLE managed_us_delay_flights_tbl (date STRING, delay INT,distance INT, origin STRING, destination STRING)")

In [None]:
# In Python
# Path to our US flight delays CSV file
csv_file = "PySpark/data/databricks-datasets/learning-spark-v2/flights/departuredelays.csv" # Schema as defined in the preceding example
schema="date STRING, delay INT, distance INT, origin STRING, destination STRING" 
flights_df = spark.read.csv(csv_file, schema=schema) 

# 위에서 만든 테이블 삭제
spark.sql("DROP TABLE IF EXISTS managed_us_delay_flights_tbl")
flights_df.write.saveAsTable("managed_us_delay_flights_tbl")

In [None]:
# 위에서 만든 테이블 삭제
spark.sql("DROP TABLE IF EXISTS us_delay_flights_tbl")

spark.sql("""CREATE TABLE us_delay_flights_tbl(date STRING, delay INT,distance INT, origin STRING, destination STRING)
    USING csv OPTIONS (
    PATH "PySpark/data/databricks-datasets/learning-spark-v2/flights/departuredelays.csv")"""
    )

In [None]:
# 덮어쓰기
spark.conf.set("spark.sql.legacy.allowNonEmptyLocationInCTAS", "true")

spark.sql("DROP TABLE IF EXISTS us_delay_flights_tbl")

(flights_df
      .write
      .option("path", "/tmp/data/us_flights_delay")
      .saveAsTable("us_delay_flights_tbl"))

### 95p

In [None]:
# -- In SQL
spark.sql("""CREATE OR REPLACE GLOBAL TEMP VIEW us_origin_airport_SFO_global_tmp_view AS SELECT date, delay, origin, destination from us_delay_flights_tbl WHERE origin = 'SFO'""")
spark.sql("""CREATE OR REPLACE TEMP VIEW us_origin_airport_JFK_tmp_view AS SELECT date, delay, origin, destination from us_delay_flights_tbl WHERE origin = 'JFK'""")

### 96p

In [None]:
# In Python
df_sfo = spark.sql("SELECT date, delay, origin, destination FROM us_delay_flights_tbl WHERE origin = 'SFO'")
df_jfk = spark.sql("SELECT date, delay, origin, destination FROM us_delay_flights_tbl WHERE origin = 'JFK'")
# Create a 전역 임시 and 임시 뷰
df_sfo.createOrReplaceGlobalTempView("us_origin_airport_SFO_global_tmp_view")
df_jfk.createOrReplaceTempView("us_origin_airport_JFK_tmp_view")

In [None]:
# -- In SQL
spark.sql("""SELECT * FROM global_temp.us_origin_airport_SFO_global_tmp_view""")
spark.sql("""SELECT * FROM us_origin_airport_JFK_tmp_view""")
# // In Scala/Python
spark.read.table("us_origin_airport_JFK_tmp_view")
# // Or
spark.sql("SELECT * FROM us_origin_airport_JFK_tmp_view")

In [None]:
# # -- In SQL
# DROP VIEW IF EXISTS us_origin_airport_SFO_global_tmp_view; 
# DROP VIEW IF EXISTS us_origin_airport_JFK_tmp_view

# // In Scala/Python
spark.catalog.dropGlobalTempView("us_origin_airport_SFO_global_tmp_view")
spark.catalog.dropTempView("us_origin_airport_JFK_tmp_view")

### 97p

In [None]:
#  // In Scala/Python
spark.catalog.listDatabases()
spark.catalog.listTables()
spark.catalog.listColumns("us_delay_flights_tbl")

In [None]:
# # -- In SQL
# # 테이블에 캐싱하기
# CACHE [LAZY] TABLE <table-name> 
# UNCACHE TABLE <table-name>

### 98p

In [None]:
us_flights_df = spark.sql("SELECT * FROM us_delay_flights_tbl")
us_flights_df2 = spark.table("us_delay_flights_tbl")

### 102p

In [None]:
# In Python
file = """PySpark/data/databricks-datasets/learning-spark-v2/flights/summary-data/parquet/2010-summary.parquet/"""
df = spark.read.format("parquet").load(file)
df.show(10)

### 103p

In [None]:
# -- In SQL
spark.sql("""
    CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl 
    USING parquet 
    OPTIONS (
        path "PySpark/data/databricks-datasets/learning-spark-v2/flights/summary-data/parquet/2010-summary.parquet/" )
""")

# In Python
spark.sql("SELECT * FROM us_delay_flights_tbl").show(10)

# # DataFrame으로 테이블을 읽어옵니다.
# us_delay_flights_df = spark.read.format("parquet").option("path", "PySpark/data/databricks-datasets/learning-spark-v2/flights/summary-data/parquet/2010-summary.parquet/").load()

# # 임시 뷰를 생성합니다.
# us_delay_flights_df.createOrReplaceTempView("us_delay_flights_tbl")

### 104p

In [None]:
# In Python
(df.write.format("parquet")
    .mode("overwrite")
    .option("compression", "snappy")
    .save("/tmp/data/parquet/df_parquet"))

In [None]:
# In Python
(df.write
    .mode("overwrite")
    .saveAsTable("us_delay_flights_tbl"))

### 105p

In [None]:
# In Python
file = "/Users/hyunjun/vscode/Spark//databricks-datasets/learning-spark-v2/flights/summary-data/json/*"
df = spark.read.format("json").load(file)
df.show(10)

### 106p

In [None]:
# -- In SQL
spark.sql("""
    CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl USING json
    OPTIONS (
    path "PySpark/data/databricks-datasets/learning-spark-v2/flights/summary-data/json/*"
    )
""")

#// In Scala/Python
spark.sql("SELECT * FROM us_delay_flights_tbl").show(10)

In [None]:
# In Python
(df.write.format("json")
    .mode("overwrite")
    .option("compression", "snappy")
    .save("/tmp/data/json/df_json"))

### 108p

In [None]:
# In Python
file = "PySpark/data/databricks-datasets/learning-spark-v2/flights/summary-data/csv/*"
schema = "DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count INT"
df = (spark.read.format("csv")
    .option("header", "true")# Exit if any errors .option("nullValue", "")
    .schema(schema)# Replace any null data field with quotes 
    .option("mode", "FAILFAST")
    .load(file))

In [None]:
# -- In SQL
spark.sql("""CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl USING csv
OPTIONS (
path "PySpark/data/databricks-datasets/learning-spark-v2/flights/summary-data/csv/*", header "true",
inferSchema "true",
mode "FAILFAST"
)""")

In [None]:
# // In Scala/Python
spark.sql("SELECT * FROM us_delay_flights_tbl").show(10)

### 109p

In [None]:
# In Python
df.write.format("csv").mode("overwrite").save("/tmp/data/csv/df_csv")

### 110p

In [None]:
df = spark.read.format("avro").load("PySpark/data/databricks-datasets/learning-spark-v2/flights/summary-data/avro/*")
df.show(truncate=False)

### 111p

In [None]:
# -- In SQL
spark.sql("""CREATE OR REPLACE TEMPORARY VIEW episode_tbl USING avro
OPTIONS (
path "PySpark/data/databricks-datasets/learning-spark-v2/flights/summary-data/avro/*"
)""")

In [None]:
# In Python
spark.sql("SELECT * FROM episode_tbl").show(truncate=False)

### 112p

In [None]:
# In Python
(df.write
  .format("avro")
  .mode("overwrite")
  .save("/tmp/data/avro/df_avro"))

### 113p

In [None]:
# In Python
file = "PySpark/data/databricks-datasets/learning-spark-v2/flights/summary-data/orc/*"
df = spark.read.format("orc").option("path", file).load()
df.show(10, False)

In [None]:
# -- In SQL
spark.sql(
    """CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl 
    USING orc
    OPTIONS (
    path "PySpark/data/databricks-datasets/learning-spark-v2/flights/summary-data/orc/*"
    )"""
)

In [None]:
# // In Scala/Python
spark.sql("SELECT * FROM us_delay_flights_tbl").show(10)

In [None]:
# In Python
(df.write.format("orc")
    .mode("overwrite")
    .option("compression", "snappy")
    .save("/tmp/data/orc/flights_orc"))

### 116p

In [None]:
# In Python
from pyspark.ml import image
image_dir = "PySpark/data/databricks-datasets/learning-spark-v2/cctvVideos/train_images/"
images_df = spark.read.format("image").load(image_dir)
images_df.printSchema()

In [None]:
images_df.select("image.height", "image.width", "image.nChannels", "image.mode",
      "label").show(5, truncate=False)

In [None]:
# In Python
path = "PySpark/data/databricks-datasets/learning-spark-v2/cctvVideos/train_images/"
binary_files_df = (spark.read.format("binaryFile")
    .option("pathGlobFilter", "*.jpg")
    .load(path))
binary_files_df.show(5)

In [None]:
# In Python
binary_files_df = (spark.read.format("binaryFile")
    .option("pathGlobFilter", "*.jpg")
    .option("recursiveFileLookup", "true")
    .load(path))
binary_files_df.show(5)