
---

## Do an analysis of the data BEFORE loading it into CosmosDB

### - Read a raw data CSV file from Azure Blob Storage
### - Explore it with Synapse
### - Identify good potential CosmosDB Partition Keys (high cardinality, well distributed)
### - Identify poor potential CosmosDB Partition Keys (low cardinality, skewed distribution)

.

---

.

.


In [14]:
%%pyspark
blob_account_name = "cjoakimstorage22"
blob_container_name = "raw"
from pyspark.sql import SparkSession

sc = SparkSession.builder.getOrCreate()
token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
blob_sas_token = token_library.getConnectionString("AzureBlobStorage_cjoakimstorage22")

spark.conf.set(
    'fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name),
    blob_sas_token)

blob_url = 'wasbs://raw@cjoakimstorage22.blob.core.windows.net/air_travel_departures.csv'

df = spark.read.load(blob_url, format='csv', header=True, sep='|')
display(df.limit(8))


StatementMeta(poolspark3s, 9, 14, Finished, Available)

SynapseWidget(Synapse.DataFrame, 4f02e3c8-1f96-4a2c-a453-15d05594400e)

## Display the observed structure of the data

In [15]:
df.printSchema()

StatementMeta(poolspark3s, 9, 15, Finished, Available)

root
 |-- date: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- from_iata: string (nullable = true)
 |-- to_iata: string (nullable = true)
 |-- airlineid: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- count: string (nullable = true)
 |-- route: string (nullable = true)
 |-- from_airport_name: string (nullable = true)
 |-- from_airport_tz: string (nullable = true)
 |-- from_airport_lat: string (nullable = true)
 |-- from_airport_lng: string (nullable = true)
 |-- to_airport_name: string (nullable = true)
 |-- to_airport_country: string (nullable = true)
 |-- to_airport_tz: string (nullable = true)
 |-- to_airport_lat: string (nullable = true)
 |-- to_airport_lng: string (nullable = true)

## Display the Row and Column Counts

In [16]:
print((df.count(), len(df.columns)))

StatementMeta(poolspark3s, 9, 16, Finished, Available)

(930808, 18)


## Explore the **airlineid** attribute as a potential CosmosDB Partition Key

In [17]:
attr_name = 'airlineid'
df.select(attr_name).distinct().count()

StatementMeta(poolspark3s, 9, 17, Finished, Available)

705

In [18]:
from pyspark.sql.functions import desc
display(df.groupBy(attr_name).count().sort(desc("count")))

StatementMeta(poolspark3s, 9, 18, Finished, Available)

SynapseWidget(Synapse.DataFrame, ab13202b-3c65-4cb7-b4ab-b9d0c31e69cd)

## Explore the **from_iata** attribute as a potential CosmosDB Partition Key

In [19]:
attr_name = 'from_iata'
df.select(attr_name).distinct().count()

StatementMeta(poolspark3s, 9, 19, Finished, Available)

1022

In [20]:
from pyspark.sql.functions import desc
display(df.groupBy(attr_name).count().sort(desc("count")))

StatementMeta(poolspark3s, 9, 20, Finished, Available)

SynapseWidget(Synapse.DataFrame, e77e4982-a2a6-4bd5-9a11-7cf899cbd6ad)

## Explore the **to_iata** attribute as a potential CosmosDB Partition Key


In [21]:
attr_name = 'to_iata'
df.select(attr_name).distinct().count()

StatementMeta(poolspark3s, 9, 21, Finished, Available)

1669

In [22]:
from pyspark.sql.functions import desc
display(df.groupBy(attr_name).count().sort(desc("count")))

StatementMeta(poolspark3s, 9, 22, Finished, Available)

SynapseWidget(Synapse.DataFrame, 288b6623-755d-4bfd-85db-785890df5519)

## Explore the **to_airport_country** attribute as a potential CosmosDB Partition Key 

In [23]:
attr_name = 'to_airport_country'
df.select(attr_name).distinct().count()

StatementMeta(poolspark3s, 9, 23, Finished, Available)

205

In [24]:
from pyspark.sql.functions import desc
display(df.groupBy(attr_name).count().sort(desc("count")))

StatementMeta(poolspark3s, 9, 24, Finished, Available)

SynapseWidget(Synapse.DataFrame, 77c7ca6e-30c0-4e57-87ca-40ac27631feb)

 ## Explore the **route** attribute as a potential CosmosDB Partition Key

In [25]:
attr_name = 'route'
df.select(attr_name).distinct().count()

StatementMeta(poolspark3s, 9, 25, Finished, Available)

25644

In [26]:
from pyspark.sql.functions import desc

display(df.groupBy(attr_name).count().sort(desc("count")))

StatementMeta(poolspark3s, 9, 26, Finished, Available)

SynapseWidget(Synapse.DataFrame, 14c96076-fd28-4177-b2d6-b017b6c8ec02)