# AVRO DATA FILE

Open API: Ref: https://openweathermap.org/current#name

```
Deploying avro driver when running spark at local machine / not in azure databricks
The spark-avro module is external and not included in spark-submit or spark-shell by default.

As with any Spark applications, spark-submit is used to launch your application. spark-avro_2.12 and its dependencies can be directly added to spark-submit using --packages, such as,

./bin/spark-submit --packages org.apache.spark:spark-avro_2.12:3.5.1 ...
For experimenting on spark-shell, you can also use --packages to add org.apache.spark:spark-avro_2.12 and its dependencies directly,

./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:3.5.1 ...
```

Avro: https://spark.apache.org/docs/latest/sql-data-sources-avro.html

For Azure Databricks, please install jars package for running avro file 

https://learn.microsoft.com/en-us/azure/databricks/query/formats/avro

In [1]:
cities = [
    ("Ho Chi Minh City", "SG", "VN"),
    ("Hanoi", "HN", "VN"),
    ("Da Nang", "DN", "VN"),
    ("Haiphong", "HP", "VN"),
    ("Bien Hoa", "BD", "VN"),
    ("Nha Trang", "NT", "VN"),
    ("Can Tho", "CT", "VN"),
    ("Hue", "HU", "VN"),
    ("Vung Tau", "VT", "VN"),
    ("Quy Nhon", "QN", "VN"),
    ("Buon Ma Thuot", "BM", "VN"),
    ("Pleiku", "GL", "VN"),
    ("My Tho", "MT", "VN"),
    ("Rach Gia", "RG", "VN"),
    ("Long Xuyen", "LX", "VN"),
    ("Tan An", "TA", "VN"),
    ("Vi Thanh", "VT", "VN"),
    ("Dong Hoi", "DH", "VN"),
    ("Sa Dec", "SD", "VN"),
    ("Soc Trang", "ST", "VN"),
    ("Yen Bai", "YB", "VN"),
    ("Tuy Hoa", "TH", "VN"),
    ("Cam Ranh", "CR", "VN"),
    ("Chau Doc", "CD", "VN"),
    ("Ha Tien", "HT", "VN"),
    ("Ben Tre", "BT", "VN"),
    ("Tra Vinh", "TV", "VN"),
    ("Bac Lieu", "BL", "VN"),
    ("Ca Mau", "CM", "VN"),
    ("Dong Xoai", "DX", "VN"),
    ("Lai Chau", "LC", "VN"),
    ("Dien Bien Phu", "DB", "VN"),
    ("Son La", "SL", "VN"),
    ("Ninh Binh", "NB", "VN"),
    ("Thai Nguyen", "TN", "VN"),
    ("Lang Son", "LS", "VN"),
    ("Hoa Binh", "HB", "VN"),
    ("Ha Long", "HL", "VN"),
    ("Moc Chau", "MC", "VN"),
    ("Phan Rang-Thap Cham", "PTC", "VN"),
    ("Bac Giang", "BG", "VN"),
    ("Bac Ninh", "BN", "VN"),
    ("Hai Duong", "HD", "VN"),
    ("Thai Binh", "TB", "VN"),
    ("Quang Ninh", "QN", "VN")]

In [10]:
import requests 

def get_weather(api_key, city, country) -> dict:
    url = f"https://api.openweathermap.org/data/2.5/weather?q={city},{country}&appid={api_key}"
    response = requests.get(url)
    data = response.json()
    return data

def get_data(api_key, cities):
    data = []
    for city in cities:
        response = get_weather(api_key=api_key, 
                            city=city[0], 
                            country="Vietnam")
        
        if response.get('cod') != '404':
            data.append(response)
        else:
            print(response)
    
    return data

data = get_data(api_key = "", cities=cities)

{'cod': '404', 'message': 'city not found'}
{'cod': '404', 'message': 'city not found'}
{'cod': '404', 'message': 'city not found'}


In [17]:
# initial spark session running on local machine
# TODO: !pip install pyspark

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .master("local[2]") \
    .appName("Connect Spark to avro") \
    .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.5.1") \
    .getOrCreate()

# ./bin/spark-submit --packages org.apache.spark:spark-avro_2.12:3.5.1 ...

spark.sparkContext.setLogLevel("WARN")

:: loading settings :: url = jar:file:/opt/homebrew/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/kato/.ivy2/cache
The jars for the packages stored in: /Users/kato/.ivy2/jars
org.apache.spark#spark-avro_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-922243ba-107a-405f-accc-454794f85171;1.0
	confs: [default]
	found org.apache.spark#spark-avro_2.12;3.5.1 in central
	found org.tukaani#xz;1.9 in central
:: resolution report :: resolve 256ms :: artifacts dl 6ms
	:: modules in use:
	org.apache.spark#spark-avro_2.12;3.5.1 from central in [default]
	org.tukaani#xz;1.9 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	---------------------------------------------------------------------


In [15]:
from pyspark.sql.types import *


dfSchema = StructType([
    StructField('coord', StructType([
        StructField('lon', StringType(), True),
        StructField('lat', StringType(), True)
    ]), True),
    StructField('weather', ArrayType(StructType([
        StructField('id', IntegerType(), True),
        StructField('main', StringType(), True),
        StructField('description', StringType(), True),
        StructField('icon', StringType(), True)
    ])), True),
    StructField('base', StringType(), True),
    StructField('main', StructType([
        StructField('temp', StringType(), True),
        StructField('feels_like', StringType(), True),
        StructField('temp_min', StringType(), True),
        StructField('temp_max', StringType(), True),
        StructField('pressure', IntegerType(), True),
        StructField('humidity', IntegerType(), True),
        StructField('sea_level', IntegerType(), True),
        StructField('grnd_level', IntegerType(), True)
    ]), True),
    StructField('visibility', IntegerType(), True),
    StructField('wind', StructType([
        StructField('speed', StringType(), True),
        StructField('deg', IntegerType(), True),
        StructField('gust', StringType(), True)
    ]), True),
    StructField('clouds', StructType([
        StructField('all', IntegerType(), True)
    ]), True),
    StructField('dt', IntegerType(), True),
    StructField('sys', StructType([
        StructField('type', IntegerType(), True),
        StructField('id', IntegerType(), True),
        StructField('country', StringType(), True),
        StructField('sunrise', IntegerType(), True),
        StructField('sunset', IntegerType(), True)
    ]), True),
    StructField('timezone', IntegerType(), True),
    StructField('id', IntegerType(), True),
    StructField('name', StringType(), True),
    StructField('cod', IntegerType(), True)
])

In [19]:
# create dataframe from records
df = spark.createDataFrame(data, schema=dfSchema)
df.show()

                                                                                

+-------------------+--------------------+--------+--------------------+----------+------------------+------+----------+--------------------+--------+-------+----------------+---+
|              coord|             weather|    base|                main|visibility|              wind|clouds|        dt|                 sys|timezone|     id|            name|cod|
+-------------------+--------------------+--------+--------------------+----------+------------------+------+----------+--------------------+--------+-------+----------------+---+
|  {106.6667, 10.75}|[{802, Clouds, sc...|stations|{301.16, 304.97, ...|     10000| {5.14, 140, NULL}|  {40}|1709819233|{1, 9314, VN, 170...|   25200|1566083|Ho Chi Minh City|200|
|{105.8412, 21.0245}|[{804, Clouds, ov...|stations|{295.15, 295, 295...|     10000|  {1.79, 103, 3.5}| {100}|1709819337|{1, 9308, VN, 170...|   25200|1581130|           Hanoi|200|
|{108.2208, 16.0678}|[{803, Clouds, br...|stations|{297.14, 297.76, ...|     10000|  {6.17, 20, NULL

In [23]:
df.write.mode('overwrite').format('avro').save('avro-test.arvo')

In [25]:
# Read data
df_input = spark.read.format("avro").load('avro-test.arvo')
df_input.show()

df_input.printSchema()

+-------------------+--------------------+--------+--------------------+----------+------------------+------+----------+--------------------+--------+-------+----------------+---+
|              coord|             weather|    base|                main|visibility|              wind|clouds|        dt|                 sys|timezone|     id|            name|cod|
+-------------------+--------------------+--------+--------------------+----------+------------------+------+----------+--------------------+--------+-------+----------------+---+
|  {106.6667, 10.75}|[{802, Clouds, sc...|stations|{301.16, 304.97, ...|     10000| {5.14, 140, NULL}|  {40}|1709819233|{1, 9314, VN, 170...|   25200|1566083|Ho Chi Minh City|200|
|{105.8412, 21.0245}|[{804, Clouds, ov...|stations|{295.15, 295, 295...|     10000|  {1.79, 103, 3.5}| {100}|1709819337|{1, 9308, VN, 170...|   25200|1581130|           Hanoi|200|
|{108.2208, 16.0678}|[{803, Clouds, br...|stations|{297.14, 297.76, ...|     10000|  {6.17, 20, NULL