In [None]:
# Common Error 
> InvalidHttpRequestToLivy: Your Spark job requested 40 vcores. However, the workspace has a 12 core limit. Try reducing the numbers of vcores requested or increasing your vcore quota. Quota can be increased using Azure Support request https://learn.microsoft.com/en-us/azure/synapse-analytics/spark/apache-spark-concepts#workspace-level HTTP status code: 400. Trace ID: bd7c1173-58b2-4a87-9d3f-5986c970afd9
- [Docs](https://learn.microsoft.com/en-us/answers/questions/1151904/invalidhttprequesttolivy-your-spark-job-requested)

## Solution:
1. - Create a serverless Apache Spark pool
2. - In Synapse Studio, on the left-side pane, select Manage > Apache Spark pools.
3. - Select New
4. - For Apache Spark pool name enter Spark1.
5. - For Node size enter Small.
6. - For Number of nodes Set the minimum to 3 and the maximum to 3
7. - Select Review + create > Create. Your Apache Spark pool will be ready in a few seconds.

In [1]:
import requests
import pandas as pd
from io import BytesIO

# URL of the Parquet file
url = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet'

# Send a GET request to download the file
response = requests.get(url)

# Check if the request was successful
if response.status_code == 200:
    # Use BytesIO to handle the binary data from the response
    parquet_file = BytesIO(response.content)
    
    # Read the Parquet file into a pandas DataFrame
    df = pd.read_parquet(parquet_file, engine='pyarrow')
    
    # Display the first few rows of the DataFrame
    print(df.head())
else:
    print(f"Failed to download file. Status code: {response.status_code}")


StatementMeta(small, 0, 2, Finished, Available, Finished)

   VendorID tpep_pickup_datetime tpep_dropoff_datetime  passenger_count  \
0         2  2024-01-01 00:57:55   2024-01-01 01:17:43              1.0   
1         1  2024-01-01 00:03:00   2024-01-01 00:09:36              1.0   
2         1  2024-01-01 00:17:06   2024-01-01 00:35:01              1.0   
3         1  2024-01-01 00:36:38   2024-01-01 00:44:56              1.0   
4         1  2024-01-01 00:46:51   2024-01-01 00:52:57              1.0   

   trip_distance  RatecodeID store_and_fwd_flag  PULocationID  DOLocationID  \
0           1.72         1.0                  N           186            79   
1           1.80         1.0                  N           140           236   
2           4.70         1.0                  N           236            79   
3           1.40         1.0                  N            79           211   
4           0.80         1.0                  N           211           148   

   payment_type  fare_amount  extra  mta_tax  tip_amount  tolls_amount  \


In [5]:
%%pyspark
df = spark.read.load('abfss://syna@synou76.dfs.core.windows.net/synapse/workspaces/yellow_tripdata_2024-01.parquet', format='parquet')
display(df.limit(10))

StatementMeta(small, 0, 6, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 90003c20-bd34-46f9-8f2b-475d0babd8d8)

In [6]:
%%pyspark
df = spark.read.load('abfss://syna@synou76.dfs.core.windows.net/synapse/workspaces/yellow_tripdata_2024-01.parquet', format='parquet')
df.write.mode("overwrite").saveAsTable("default.NYC")

StatementMeta(small, 0, 7, Finished, Available, Finished)

In [16]:
from pyspark.sql import SparkSession

def get_table():
    # Initialize SparkSession
    spark = SparkSession.builder \
        .appName("List Tables") \
        .getOrCreate()

    # List all tables in the default database
    tables = spark.catalog.listTables()

    # Print table names
    for table in tables:
        print(table.name)

get_table()


StatementMeta(small, 0, 17, Finished, Available, Finished)

nyc


In [10]:
%%pyspark
df.printSchema()

StatementMeta(small, 0, 11, Finished, Available, Finished)

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [11]:
%%pyspark
spark.sql("CREATE DATABASE IF NOT EXISTS nyctaxi")
df.write.mode("overwrite").saveAsTable("nyctaxi.trip")

StatementMeta(small, 0, 12, Finished, Available, Finished)

In [12]:
%%pyspark
df = spark.sql("SELECT * FROM nyctaxi.trip") 
display(df)

StatementMeta(small, 0, 13, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 77c8d09c-81e9-4b7b-b960-2e076745ad24)

In [13]:
%%pyspark
df = spark.sql("""
   SELECT passenger_count,
       SUM(trip_distance) as SumTripDistance,
       AVG(trip_distance) as AvgTripDistance
   FROM nyctaxi.trip
   WHERE trip_distance > 0 AND passenger_count > 0
   GROUP BY passenger_count
   ORDER BY passenger_count
""") 
display(df)
df.write.saveAsTable("nyctaxi.passengercountstats")

StatementMeta(small, 0, 14, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, da2b051a-4c1a-4494-b1fc-ce1b7e742ff0)

In [19]:
%%pyspark
df = spark.sql("SELECT * FROM nyctaxi.passengercountstats")
df = df.repartition(1) # This ensures we'll get a single file during write()

# this creates new folder 
df.write.mode("overwrite").csv("/NYCTaxi/PassengerCountStats_csvformat")
df.write.mode("overwrite").parquet("/NYCTaxi/PassengerCountStats_parquetformat")

# display(df)

StatementMeta(small, 0, 20, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 791b31f4-cd9e-431c-ab20-219cc5f90b83)

In [21]:
%%pyspark
abspath = 'abfss://synou76.blob.core.windows.net/syna/NYCTaxi/PassengerCountStats_parquetformat/part-00000-07c6310a-4189-4186-bb38-79c85b03652d-c000.snappy.parquet'
df = spark.read.load(abspath, format='parquet')
display(df.limit(10))

StatementMeta(small, 0, 22, Finished, Available, Finished)

Py4JJavaError: An error occurred while calling o8556.load.
: abfss://synou76.blob.core.windows.net/syna/NYCTaxi/PassengerCountStats_parquetformat/part-00000-07c6310a-4189-4186-bb38-79c85b03652d-c000.snappy.parquet has invalid authority.
	at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.authorityParts(AzureBlobFileSystemStore.java:354)
	at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.<init>(AzureBlobFileSystemStore.java:207)
	at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.initialize(AzureBlobFileSystem.java:192)
	at com.microsoft.vegas.vfs.VegasFileSystem.initialize(VegasFileSystem.java:133)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3469)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:725)
	at scala.collection.immutable.List.map(List.scala:293)
	at org.apache.spark.sql.execution.datasources.DataSource$.checkAndGlobPathIfNecessary(DataSource.scala:723)
	at org.apache.spark.sql.execution.datasources.DataSource.checkAndGlobPathIfNecessary(DataSource.scala:552)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:404)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:236)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:219)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:219)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:188)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)
