In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
spark

In [2]:
df = spark.range(0, 10000, 1, 8)
print(df.rdd.getNumPartitions())

8


In [3]:
df.show(0)

+---+
| id|
+---+
+---+
only showing top 0 rows



In [4]:
import urllib

sparkHost = "sparky"

for x in range(4040, 4060):
    link = "http://" + sparkHost + ":" + str(x) + "/environment/"
    try:
        f = urllib.request.urlopen(link)
        myfile = f.read()
        if sc.applicationId in str(myfile):
            print("Application ID found on port ", x)
    except:
        pass

In [5]:
from pyspark import SparkContext as sc

In [6]:
sc.uiWebUrl.fdel

## The DataFrame API

Разные способы определить схему

In [7]:
from pyspark.sql.types import *

In [8]:
schema = StructType(
    [
        StructField("author", StringType(), False),
        StructField("title", StringType(), False),
        StructField("pages", IntegerType(), False),
    ]
)

In [9]:
schema = "author STRING, title STRING, pages INT"

In [10]:
schema = "`Id` INT, `First` STRING, `Last` STRING, `Url` STRING, `Published` STRING, `Hits` INT, `Campaigns` ARRAY<STRING>"

In [11]:
data = [
    [
        1,
        "Jules",
        "Damji",
        "https://tinyurl.1",
        "1/4/2016",
        4535,
        ["twitter", "LinkedIn"],
    ],
    [
        2,
        "Brooke",
        "Wenig",
        "https://tinyurl.2",
        "5/5/2018",
        8908,
        ["twitter", "LinkedIn"],
    ],
    [
        3,
        "Denny",
        "Lee",
        "https://tinyurl.3",
        "6/7/2019",
        7659,
        ["web", "twitter", "FB", "LinkedIn"],
    ],
    [4, "Tathagata", "Das", "https://tinyurl.4", "5/12/2018", 10568, ["twitter", "FB"]],
    [
        5,
        "Matei",
        "Zaharia",
        "https://tinyurl.5",
        "5/14/2014",
        40578,
        ["web", "twitter", "FB", "LinkedIn"],
    ],
    [
        6,
        "Reynold",
        "Xin",
        "https://tinyurl.6",
        "3/2/2015",
        25568,
        ["twitter", "LinkedIn"],
    ],
]

In [12]:
schema = StructType(
    [
        StructField("Id", IntegerType(), False),
        StructField("First", StringType(), False),
        StructField("Last", StringType(), False),
        StructField("Url", StringType(), False),
        StructField("Published", StringType(), False),
        StructField("Hits", IntegerType(), False),
        StructField("Campaigns", ArrayType(StringType()), False),
    ]
)

In [13]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [14]:
blogs_df = spark.createDataFrame(data, schema)

In [15]:
blogs_df.show()

+---+---------+-------+-----------------+---------+-----+--------------------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|
+---+---------+-------+-----------------+---------+-----+--------------------+
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter, LinkedIn]|
+---+---------+-------+-----------------+---------+-----+--------------------+



In [16]:
blogs_df.printSchema()

root
 |-- Id: integer (nullable = false)
 |-- First: string (nullable = false)
 |-- Last: string (nullable = false)
 |-- Url: string (nullable = false)
 |-- Published: string (nullable = false)
 |-- Hits: integer (nullable = false)
 |-- Campaigns: array (nullable = false)
 |    |-- element: string (containsNull = true)



In [17]:
blogs_df.schema

StructType([StructField('Id', IntegerType(), False), StructField('First', StringType(), False), StructField('Last', StringType(), False), StructField('Url', StringType(), False), StructField('Published', StringType(), False), StructField('Hits', IntegerType(), False), StructField('Campaigns', ArrayType(StringType(), True), False)])

функциии

In [18]:
import pyspark.sql.functions as F

In [19]:
blogs_df.columns

['Id', 'First', 'Last', 'Url', 'Published', 'Hits', 'Campaigns']

In [20]:
blogs_df.select(F.expr("Hits * 2")).show(2)

+----------+
|(Hits * 2)|
+----------+
|      9070|
|     17816|
+----------+
only showing top 2 rows



In [21]:
blogs_df.select(F.col("Hits") * 2).show(2)

+----------+
|(Hits * 2)|
+----------+
|      9070|
|     17816|
+----------+
only showing top 2 rows



In [22]:
blogs_df.withColumn(
    "AuthorsId", (F.concat(F.expr("First"), F.expr("Last"), F.expr("Id")))
).select("AuthorsId").show(3)

+------------+
|   AuthorsId|
+------------+
| JulesDamji1|
|BrookeWenig2|
|   DennyLee3|
+------------+
only showing top 3 rows



Загружаем таблицу и определяем схему

In [23]:
from pyspark.sql.types import *

fire_schema = StructType(
    [
        StructField("CallNumber", IntegerType(), True),
        StructField("UnitID", StringType(), True),
        StructField("IncidentNumber", IntegerType(), True),
        StructField("CallType", StringType(), True),
        StructField("CallDate", StringType(), True),
        StructField("WatchDate", StringType(), True),
        StructField("CallFinalDisposition", StringType(), True),
        StructField("AvailableDtTm", StringType(), True),
        StructField("Address", StringType(), True),
        StructField("City", StringType(), True),
        StructField("Zipcode", IntegerType(), True),
        StructField("Battalion", StringType(), True),
        StructField("StationArea", StringType(), True),
        StructField("Box", StringType(), True),
        StructField("OriginalPriority", StringType(), True),
        StructField("Priority", StringType(), True),
        StructField("FinalPriority", IntegerType(), True),
        StructField("ALSUnit", BooleanType(), True),
        StructField("CallTypeGroup", StringType(), True),
        StructField("NumAlarms", IntegerType(), True),
        StructField("UnitType", StringType(), True),
        StructField("UnitSequenceInCallDispatch", IntegerType(), True),
        StructField("FirePreventionDistrict", StringType(), True),
        StructField("SupervisorDistrict", StringType(), True),
        StructField("Neighborhood", StringType(), True),
        StructField("Location", StringType(), True),
        StructField("RowID", StringType(), True),
        StructField("Delay", FloatType(), True),
    ]
)

In [24]:
sf_fire_file = "./LearningSparkV2/chapter3/data/sf-fire-calls.csv"
fire_df = spark.read.csv(sf_fire_file, header=True, schema=fire_schema)

In [25]:
fire_df.show()

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|      UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|    Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+

In [26]:
fire_df.write.format("parquet").save("owl.csv")

AnalysisException: [PATH_ALREADY_EXISTS] Path file:/home/jovyan/code/owl.csv already exists. Set mode as "overwrite" to overwrite the existing path.

In [None]:
fire_df.write.format("parquet").saveAsTable("owl")

In [None]:
spark.read.parquet("owl.csv").csv

DataFrame[CallNumber: int, UnitID: string, IncidentNumber: int, CallType: string, CallDate: string, WatchDate: string, CallFinalDisposition: string, AvailableDtTm: string, Address: string, City: string, Zipcode: int, Battalion: string, StationArea: string, Box: string, OriginalPriority: string, Priority: string, FinalPriority: int, ALSUnit: boolean, CallTypeGroup: string, NumAlarms: int, UnitType: string, UnitSequenceInCallDispatch: int, FirePreventionDistrict: string, SupervisorDistrict: string, Neighborhood: string, Location: string, RowID: string, Delay: float]

In [None]:
import pyarrow.parquet as pq

df = pq.read_table("owl.csv")

In [None]:
fire_df.select("IncidentNumber", "AvailableDtTm", "CallType").where(
    F.col("CallType") != "Medical Incident"
).show(5, truncate=False)

+--------------+----------------------+--------------+
|IncidentNumber|AvailableDtTm         |CallType      |
+--------------+----------------------+--------------+
|2003235       |01/11/2002 01:51:44 AM|Structure Fire|
|2003250       |01/11/2002 04:16:46 AM|Vehicle Fire  |
|2003259       |01/11/2002 06:01:58 AM|Alarms        |
|2003279       |01/11/2002 08:03:26 AM|Structure Fire|
|2003301       |01/11/2002 09:46:44 AM|Alarms        |
+--------------+----------------------+--------------+
only showing top 5 rows



In [None]:
fire_df.select("IncidentNumber", "AvailableDtTm", "CallType").where(
    F.col("CallType") != "Medical Incident"
).show(5, truncate=True)

+--------------+--------------------+--------------+
|IncidentNumber|       AvailableDtTm|      CallType|
+--------------+--------------------+--------------+
|       2003235|01/11/2002 01:51:...|Structure Fire|
|       2003250|01/11/2002 04:16:...|  Vehicle Fire|
|       2003259|01/11/2002 06:01:...|        Alarms|
|       2003279|01/11/2002 08:03:...|Structure Fire|
|       2003301|01/11/2002 09:46:...|        Alarms|
+--------------+--------------------+--------------+
only showing top 5 rows



In [None]:
fire_df.select("CallType").where(F.col("CallType").isNotNull()).agg(
    F.countDistinct("CallType").alias("DistinctCallTypes")
).show()

+-----------------+
|DistinctCallTypes|
+-----------------+
|               30|
+-----------------+



In [None]:
fire_df.select("CallType").where(F.col("CallType").isNotNull()).distinct().alias(
    "DistinctCallTypes"
).show(10, False)

+-----------------------------+
|CallType                     |
+-----------------------------+
|Elevator / Escalator Rescue  |
|Aircraft Emergency           |
|Alarms                       |
|Odor (Strange / Unknown)     |
|Citizen Assist / Service Call|
|HazMat                       |
|Explosion                    |
|Oil Spill                    |
|Vehicle Fire                 |
|Suspicious Package           |
+-----------------------------+
only showing top 10 rows



In [None]:
fire_ts_df = (
    fire_df.withColumn("IncidentDate", F.to_timestamp(F.col("CallDate"), "MM/dd/yyyy"))
    .drop("CallDate")
    .withColumn("OnWatchDate", F.to_timestamp(F.col("WatchDate"), "MM/dd/yyyy"))
    .drop("WatchDate")
    .withColumn(
        "AvailableDtTS", F.to_timestamp(F.col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ss a")
    )
)

In [None]:
fire_ts_df.select("IncidentDate", "OnWatchDate", "AvailableDtTS").show(4)

+-------------------+-------------------+-------------------+
|       IncidentDate|        OnWatchDate|      AvailableDtTS|
+-------------------+-------------------+-------------------+
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 01:51:44|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 03:01:18|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 02:39:50|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 04:16:46|
+-------------------+-------------------+-------------------+
only showing top 4 rows



In [None]:
fire_ts_df.select(F.year("IncidentDate")).distinct().orderBy(
    F.year("IncidentDate")
).show()

+------------------+
|year(IncidentDate)|
+------------------+
|              2000|
|              2001|
|              2002|
|              2003|
|              2004|
|              2005|
|              2006|
|              2007|
|              2008|
|              2009|
|              2010|
|              2011|
|              2012|
|              2013|
|              2014|
|              2015|
|              2016|
|              2017|
|              2018|
+------------------+



In [None]:
fire_ts_df.select("CallType").where(F.col("CallType").isNotNull()).groupBy(
    "CallType"
).count().orderBy("count", ascending=False).show(n=10, truncate=False)

+-------------------------------+------+
|CallType                       |count |
+-------------------------------+------+
|Medical Incident               |113794|
|Structure Fire                 |23319 |
|Alarms                         |19406 |
|Traffic Collision              |7013  |
|Citizen Assist / Service Call  |2524  |
|Other                          |2166  |
|Outside Fire                   |2094  |
|Vehicle Fire                   |854   |
|Gas Leak (Natural and LP Gases)|764   |
|Water Rescue                   |755   |
+-------------------------------+------+
only showing top 10 rows



In [None]:
fire_ts_df.select("CallType").where(F.col("CallType").isNotNull()).groupBy(
    "CallType"
).agg(F.count("CallType")).show()

+--------------------+---------------+
|            CallType|count(CallType)|
+--------------------+---------------+
|Elevator / Escala...|            453|
|  Aircraft Emergency|             36|
|              Alarms|          19406|
|Odor (Strange / U...|            490|
|Citizen Assist / ...|           2524|
|              HazMat|            124|
|           Explosion|             89|
|           Oil Spill|             21|
|        Vehicle Fire|            854|
|  Suspicious Package|             15|
|Extrication / Ent...|             28|
|               Other|           2166|
|        Outside Fire|           2094|
|   Traffic Collision|           7013|
|       Assist Police|             35|
|Gas Leak (Natural...|            764|
|        Water Rescue|            755|
|   Electrical Hazard|            482|
|   High Angle Rescue|             32|
|      Structure Fire|          23319|
+--------------------+---------------+
only showing top 20 rows



In [None]:
fire_ts_df.select(F.count("Address")).show()

+--------------+
|count(Address)|
+--------------+
|        175296|
+--------------+



In [None]:
fire_ts_df.describe().show()

+-------+--------------------+------------------+------------------+--------------------+--------------------+--------------------+--------------------+-----------+------------------+---------+------------------+------------------+------------------+------------------+-------------------+--------------------+-------------------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+-----------------+
|summary|          CallNumber|            UnitID|    IncidentNumber|            CallType|CallFinalDisposition|       AvailableDtTm|             Address|       City|           Zipcode|Battalion|       StationArea|               Box|  OriginalPriority|          Priority|      FinalPriority|       CallTypeGroup|          NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|            Delay|
+-------+--------------------+----

### SQL

In [27]:
mnm_file = "data/mnm_dataset.csv"
mnm_df = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load(mnm_file)
)

In [28]:
mnm_df.createOrReplaceTempView("mnm_df")

In [29]:
mnm_df_new = spark.sql(
    """SELECT State, Color, Count, sum(Count) AS Total 
             FROM mnm_df 
             GROUP BY State, Color, Count
             ORDER BY Total DESC             
             """
)

In [30]:
mnm_df_new.explain(True)

== Parsed Logical Plan ==
'Sort ['Total DESC NULLS LAST], true
+- 'Aggregate ['State, 'Color, 'Count], ['State, 'Color, 'Count, 'sum('Count) AS Total#328]
   +- 'UnresolvedRelation [mnm_df], [], false

== Analyzed Logical Plan ==
State: string, Color: string, Count: int, Total: bigint
Sort [Total#328L DESC NULLS LAST], true
+- Aggregate [State#322, Color#323, Count#324], [State#322, Color#323, Count#324, sum(Count#324) AS Total#328L]
   +- SubqueryAlias mnm_df
      +- View (`mnm_df`, [State#322,Color#323,Count#324])
         +- Relation [State#322,Color#323,Count#324] csv

== Optimized Logical Plan ==
Sort [Total#328L DESC NULLS LAST], true
+- Aggregate [State#322, Color#323, Count#324], [State#322, Color#323, Count#324, sum(Count#324) AS Total#328L]
   +- Relation [State#322,Color#323,Count#324] csv

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [Total#328L DESC NULLS LAST], true, 0
   +- Exchange rangepartitioning(Total#328L DESC NULLS LAST, 200), ENSURE_REQUIREMEN

## Chapter 4

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.appName("SparkSQLExampleApp").getOrCreate()
# spark.conf.set("spark.sql.catalogImplementation", "hive")

In [5]:
csv_file = "data/departuredelays.csv"

In [6]:
df = (
    spark.read.format("csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .load(csv_file)
)

In [7]:
df.show(5)

+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|1011245|    6|     602|   ABE|        ATL|
|1020600|   -8|     369|   ABE|        DTW|
|1021245|   -2|     602|   ABE|        ATL|
|1020605|   -4|     602|   ABE|        ATL|
|1031245|   -4|     602|   ABE|        ATL|
+-------+-----+--------+------+-----------+
only showing top 5 rows



In [8]:
df.createOrReplaceGlobalTempView("tab1")

In [9]:
schema = "date STRING, delay INT, distance INT, origin STRING, destination STRING"
df = spark.read.csv(csv_file, header=True, schema=schema)
df.createOrReplaceGlobalTempView("us_delay_flights_tbl")

In [10]:
spark.sql(
    """ SELECT distance, origin, destination
          FROM global_temp.us_delay_flights_tbl WHERE distance > 1000 
          ORDER BY distance DESC
          """
).show(10)

+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
+--------+------+-----------+
only showing top 10 rows



In [11]:
spark.sql(
    """
          SELECT date, delay, origin, destination
          FROM global_temp.us_delay_flights_tbl
          WHERE delay > 120 AND ORIGIN = 'SFO' AND DESTINATION = 'ORD'
          ORDER BY delay DESC
          """
).show(10)

+--------+-----+------+-----------+
|    date|delay|origin|destination|
+--------+-----+------+-----------+
|02190925| 1638|   SFO|        ORD|
|01031755|  396|   SFO|        ORD|
|01022330|  326|   SFO|        ORD|
|01051205|  320|   SFO|        ORD|
|01190925|  297|   SFO|        ORD|
|02171115|  296|   SFO|        ORD|
|01071040|  279|   SFO|        ORD|
|01051550|  274|   SFO|        ORD|
|03120730|  266|   SFO|        ORD|
|01261104|  258|   SFO|        ORD|
+--------+-----+------+-----------+
only showing top 10 rows



In [23]:
owl = spark.sql(
    """
          SELECT delay, origin, destination,
            CASE 
                WHEN delay > 360 THEN "Very Long Delays"
                WHEN delay > 120 AND delay < 360 THEN "Lang Delays"
                WHEN delay > 60 AND delay < 120 THEN "Short Delays"
                WHEN delay > 0 and delay < 60 THEN "Tolerable Delays"
                ELSE "Early"
            END AS Flights_delays
            FROM global_temp.us_delay_flights_tbl
            ORDER BY origin, delay DESC              
          """
)
owl.show(10)

+-----+------+-----------+--------------+
|delay|origin|destination|Flights_delays|
+-----+------+-----------+--------------+
|  333|   ABE|        ATL|   Lang Delays|
|  305|   ABE|        ATL|   Lang Delays|
|  275|   ABE|        ATL|   Lang Delays|
|  257|   ABE|        ATL|   Lang Delays|
|  247|   ABE|        ATL|   Lang Delays|
|  247|   ABE|        DTW|   Lang Delays|
|  219|   ABE|        ORD|   Lang Delays|
|  211|   ABE|        ATL|   Lang Delays|
|  197|   ABE|        DTW|   Lang Delays|
|  192|   ABE|        ORD|   Lang Delays|
+-----+------+-----------+--------------+
only showing top 10 rows



In [22]:
spark.sql("""
          CREATE OR REPLACE GLOBAL TEMP VIEW us_origin_airport_SFO_global_tmp_view AS
          SELECT delay, origin, destination,
            CASE 
                WHEN delay > 360 THEN "Very Long Delays"
                WHEN delay > 120 AND delay < 360 THEN "Lang Delays"
                WHEN delay > 60 AND delay < 120 THEN "Short Delays"
                WHEN delay > 0 and delay < 60 THEN "Tolerable Delays"
                ELSE "Early"
            END AS Flights_delays
            FROM global_temp.us_delay_flights_tbl
            ORDER BY origin, delay DESC              
          """)

DataFrame[]

In [26]:
spark.sql(    """
          SELECT * FROM global_temp.us_origin_airport_SFO_global_tmp_view          
          """).show(10)

+-----+------+-----------+--------------+
|delay|origin|destination|Flights_delays|
+-----+------+-----------+--------------+
|  333|   ABE|        ATL|   Lang Delays|
|  305|   ABE|        ATL|   Lang Delays|
|  275|   ABE|        ATL|   Lang Delays|
|  257|   ABE|        ATL|   Lang Delays|
|  247|   ABE|        ATL|   Lang Delays|
|  247|   ABE|        DTW|   Lang Delays|
|  219|   ABE|        ORD|   Lang Delays|
|  211|   ABE|        ATL|   Lang Delays|
|  197|   ABE|        DTW|   Lang Delays|
|  192|   ABE|        ORD|   Lang Delays|
+-----+------+-----------+--------------+
only showing top 10 rows



In [13]:
from pyspark.sql.functions import col, desc

In [14]:
df.select("distance", "origin", "destination").where(col("distance") > 1000).orderBy(
    desc("distance")
).show(10)

+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
+--------+------+-----------+
only showing top 10 rows



In [15]:
spark.sql("CREATE DATABASE learn_spark_db")
spark.sql("USE learn_spark_db")

DataFrame[]

In [16]:
spark.sql("SELECT current_schema()").show()

+------------------+
|current_database()|
+------------------+
|    learn_spark_db|
+------------------+



In [31]:
spark.catalog.listDatabases()
spark.catalog.listTables()
spark.catalog.listColumns("global_temp.us_delay_flights_tbl")

[Column(name='date', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='delay', description=None, dataType='int', nullable=True, isPartition=False, isBucket=False),
 Column(name='distance', description=None, dataType='int', nullable=True, isPartition=False, isBucket=False),
 Column(name='origin', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='destination', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False)]

In [33]:
spark.sql("CACHE LAZY TABLE global_temp.us_delay_flights_tbl")

DataFrame[]

In [34]:
df = spark.read.format('avro').load("./data/avro/*")

AnalysisException: Failed to find data source: avro. Avro is built-in but external data source module since Spark 2.4. Please deploy the application as per the deployment section of Apache Avro Data Source Guide.