In [11]:
import pyspark
pyspark_version = pyspark.__version__
from pyspark.sql import SparkSession
from delta.pip_utils import configure_spark_with_delta_pip
from pyspark.sql.functions import *


class DeltaSpark:

    _SHOW = True

    ####################################################################################################################
    def __init__(self, app_name: str = 'PySparkDelta', warehouse: str = 'warehouse/'):
        super().__init__()
        self._appName = app_name
        self._warehouseDir = warehouse

    ####################################################################################################################
    def initialize(self):

        # --------------------------------------------------------------------------------------------------------------
        spark = (SparkSession
                 .builder
                 .appName(self._appName)
                 .config('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension')
                 .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog')
                 .config('spark.sql.warehouse.dir', self._warehouseDir)
                 .enableHiveSupport()
                 )
        # --------------------------------------------------------------------------------------------------------------
        spark = configure_spark_with_delta_pip(spark).getOrCreate()
        spark.sparkContext.setLogLevel('ERROR')
        spark.conf.set("spark.sql.shuffle.partitions", 8)
        # --------------------------------------------------------------------------------------------------------------
        delta_version_info = f' Version = 3.1.0 '.center(22, ' ')
        pyspark_version_info_1 = str(pyspark_version)
        pyspark_version_info = f'Version = {pyspark_version_info_1} '.ljust(19, ' ')
        # --------------------------------------------------------------------------------------------------------------
        print(f"""{' ' * 9}______      _____                  _      _  ______     _ _          _  ______               _       """)
        print(f"""{' ' * 9}| ___ \\    /  ___|                | |    | | |  _  \\   | | |        | | | ___ \\             | |      """)
        print(f"""{' ' * 9}| |_/ /   _\ `--. _ __   __ _ _ __| | __ | | | | | |___| | |_ __ _  | | | |_/ /___  __ _  __| |_   _ """)
        print(f"""{' ' * 9}|  __/ | | |`--. \\ '_ \ / _` | '__| |/ / | | | | | / _ \\ | __/ _` | | | |    // _ \\/ _` |/ _` | | | |""")
        print(f"""{' ' * 9}| |  | |_| /\__/ / |_) | (_| | |  |   <  | | | |/ /  __/ | || (_| | | | | |\ \  __/ (_| | (_| | |_| |""")
        print(f"""{' ' * 9}\\_|   \\__, \\____/| .__/ \\__,_|_|  |_|\\_\\ | | |___/ \\___|_|\__\\__,_| | | \\_| \\_\\___|\\__,_|\\__,_|\\__, |""")
        print(f"""{' ' * 9}       __/ |     | |                     | |                        | |                         __/ |""")
        print(f"""{' ' * 9}      |___/      |_| {pyspark_version_info} |_| {delta_version_info} |_|                        |___/ """)
        # --------------------------------------------------------------------------------------------------------------

        return spark

In [4]:
spark = DeltaSpark().initialize()

         ______      _____                  _      _  ______     _ _          _  ______               _       
         | ___ \    /  ___|                | |    | | |  _  \   | | |        | | | ___ \             | |      
         | |_/ /   _\ `--. _ __   __ _ _ __| | __ | | | | | |___| | |_ __ _  | | | |_/ /___  __ _  __| |_   _ 
         |  __/ | | |`--. \ '_ \ / _` | '__| |/ / | | | | | / _ \ | __/ _` | | | |    // _ \/ _` |/ _` | | | |
         | |  | |_| /\__/ / |_) | (_| | |  |   <  | | | |/ /  __/ | || (_| | | | | |\ \  __/ (_| | (_| | |_| |
         \_|   \__, \____/| .__/ \__,_|_|  |_|\_\ | | |___/ \___|_|\__\__,_| | | \_| \_\___|\__,_|\__,_|\__, |
                __/ |     | |                     | |                        | |                         __/ |
               |___/      |_| Version = 3.5.0     |_|    Version = 3.1.0     |_|                        |___/ 


In [3]:
df = spark.read.format("delta").load("warehouse/info.db/log")
df.show()

                                                                                

+--------------------+-------+--------------------+-------------+--------------------+--------------------+
|                  id|     db|               table|rows_inserted|          start_time|            end_time|
+--------------------+-------+--------------------+-------------+--------------------+--------------------+
|2024-02-11 19:47:...|gold.db|gold_d_brewery_co...|            0|2024-02-11 16:47:...|2024-02-11 16:47:...|
|2024-02-11 19:52:...|gold.db|gold_d_brewery_co...|            0|2024-02-11 16:52:...|2024-02-11 16:52:...|
|2024-02-11 19:47:...|gold.db|gold_d_brewery_state|            0|2024-02-11 16:47:...|2024-02-11 16:47:...|
|2024-02-11 19:52:...|gold.db|gold_d_brewery_state|            0|2024-02-11 16:52:...|2024-02-11 16:52:...|
|2024-02-11 19:52:...|gold.db| gold_d_brewery_city|            0|2024-02-11 16:52:...|2024-02-11 16:52:...|
|2024-02-11 19:47:...|gold.db| gold_d_brewery_city|            0|2024-02-11 16:47:...|2024-02-11 16:47:...|
|2024-02-11 19:47:...|gold.d

In [5]:
dict_delta_tables_gold_brew = {
    [item[1] for item in row.asDict().items()][1].split('_')[-1]: 
        "/".join([item[1] for item in row.asDict().items()]) for row in spark.read.format("delta").load("warehouse/info.db/log").select('db', 'table').filter("db = 'gold.db'").distinct().collect()}

In [6]:
df_country = spark.read.format("delta").load('warehouse/' + dict_delta_tables_gold_brew.get('country'))
df_state = spark.read.format("delta").load('warehouse/' + dict_delta_tables_gold_brew.get('state'))
df_city = spark.read.format("delta").load('warehouse/' + dict_delta_tables_gold_brew.get('city'))
df_brew = spark.read.format("delta").load('warehouse/' + dict_delta_tables_gold_brew.get('brewery'))

---

# Getting the number of Breweries per country

In [7]:
df_country.orderBy('PK').show()

+--------+-------------+
|      PK|      country|
+--------+-------------+
|30000000|     Scotland|
|30000001|United States|
|30000002|      England|
|30000003|      Austria|
|30000004|       France|
|30000005|       Poland|
|30000006|      Ireland|
|30000007|     Portugal|
|30000008|  South Korea|
|30000009|  Isle Of Man|
+--------+-------------+


We can uso the `PK` column with the column `p_country` in df_brew as shown below

In [8]:
df_brew.select('id', 'p_country', 'name').limit(10).show()

                                                                                

+--------------------+---------+--------------------+
|                  id|p_country|                name|
+--------------------+---------+--------------------+
|1b2570c6-9396-41a...| 30000001|   Peter B's Brewpub|
|1859086e-a38e-4e0...| 30000001|  Ironmonger Brewing|
|0cb99662-37d0-4a6...| 30000001|Pumphouse Brewery...|
|047e6c05-5c5f-4ee...| 30000001|Buzz Bomb Brewing Co|
|051edb66-62ad-455...| 30000001|    Asher Brewing Co|
|053d96fd-337c-4d7...| 30000001|Hoppin' Frog Brew...|
|06f29dde-6f82-4e2...| 30000001|      M.i.a. Beer Co|
|0c9c1cb2-6f27-432...| 30000001|Prison Hill Brewi...|
|16260ec1-907c-4e9...| 30000001|Town Branch Disti...|
|1cfa05f2-9520-470...| 30000001| Forgotten Road Ales|
+--------------------+---------+--------------------+


Fot thar first we will group the `df_brew` so that we can merge both to get the number of breweries per country

In [12]:
df_brew_grouped = df_brew.select('id', 'p_country').groupBy('p_country').count().orderBy('p_country').select(col('p_country').alias('PK'), col('count').alias('total_breweries'))
df_brew_grouped.show()



+--------+---------------+
|      PK|total_breweries|
+--------+---------------+
|30000000|             10|
|30000001|           7967|
|30000002|             62|
|30000003|             14|
|30000004|              3|
|30000005|             34|
|30000006|             70|
|30000007|             14|
|30000008|             61|
|30000009|              2|
+--------+---------------+


                                                                                

Now with the amout of breweries grouped by country we can merge both

In [13]:
df_country_total_brews = df_country.join(df_brew_grouped, on=['PK'], how='inner').orderBy('total_breweries', ascending=False)
df_country_total_brews.show()



+--------+-------------+---------------+
|      PK|      country|total_breweries|
+--------+-------------+---------------+
|30000001|United States|           7967|
|30000006|      Ireland|             70|
|30000002|      England|             62|
|30000008|  South Korea|             61|
|30000005|       Poland|             34|
|30000007|     Portugal|             14|
|30000003|      Austria|             14|
|30000000|     Scotland|             10|
|30000004|       France|              3|
|30000009|  Isle Of Man|              2|
+--------+-------------+---------------+


                                                                                

We can still improve on the information provided by showing the amount of breweries per `brewery_type`.

For that we must create a pivot table grouping by `p_country` and `brewery_type` on the `df_brew` and counting on `id` once more so that we can then pivot on `brewery_type` and for the `null` that we will get on the cases that we don't have the brewery type we will use a `.na.fill()` to fix it with `0`.

In [17]:
df_pivot_brew_type = df_brew.select(col('id'), col('brewery_type'), col('p_country').alias('PK')).groupBy('PK').pivot('brewery_type').count().orderBy('PK').drop('id')
# Filling the NULL values form the groupby and pivot
df_pivot_brew_type = df_pivot_brew_type.na.fill(0, subset=[column for column in df_pivot_brew_type.schema.names if column not in ['PK', 'p_country', 'id']])

df_pivot_brew_type.show(truncate=False)



+--------+---+-------+------+--------+-----+-----+----+--------+----------+--------+-------+
|PK      |bar|brewpub|closed|contract|large|micro|nano|planning|proprietor|regional|taproom|
+--------+---+-------+------+--------+-----+-----+----+--------+----------+--------+-------+
|30000000|0  |0      |1     |0       |0    |9    |0   |0       |0         |0       |0      |
|30000001|2  |2395   |190   |193     |74   |4131 |9   |691     |69        |213     |0      |
|30000002|0  |10     |0     |0       |1    |40   |0   |0       |0         |0       |11     |
|30000003|2  |1      |0     |0       |9    |0    |2   |0       |0         |0       |0      |
|30000004|0  |0      |0     |0       |0    |3    |0   |0       |0         |0       |0      |
|30000005|0  |11     |0     |0       |0    |20   |0   |0       |0         |3       |0      |
|30000006|0  |9      |0     |0       |3    |49   |0   |0       |0         |9       |0      |
|30000007|0  |7      |0     |0       |0    |7    |0   |0       |0     

                                                                                

Now let's merge both dataframes to better understand

In [22]:
from operator import add
from functools import reduce

df_country_final = df_country_total_brews.join(df_pivot_brew_type, on=['PK'], how='inner').drop('PK')
if (df_country_final.withColumn('ValidateTotal', 
                                reduce(add, [col(x) for x in df_country_final.schema.names if x not in ['PK', 'country', 'total_breweries']]))
                    .withColumn('compare', col('ValidateTotal') - col('total_breweries'))
                    .filter(col('compare') != 0).count()) == 0:
    df_country_final.orderBy('total_breweries', ascending=False).show(truncate=False)



+-------------+---------------+---+-------+------+--------+-----+-----+----+--------+----------+--------+-------+
|country      |total_breweries|bar|brewpub|closed|contract|large|micro|nano|planning|proprietor|regional|taproom|
+-------------+---------------+---+-------+------+--------+-----+-----+----+--------+----------+--------+-------+
|United States|7967           |2  |2395   |190   |193     |74   |4131 |9   |691     |69        |213     |0      |
|Ireland      |70             |0  |9      |0     |0       |3    |49   |0   |0       |0         |9       |0      |
|England      |62             |0  |10     |0     |0       |1    |40   |0   |0       |0         |0       |11     |
|South Korea  |61             |0  |56     |0     |0       |0    |5    |0   |0       |0         |0       |0      |
|Poland       |34             |0  |11     |0     |0       |0    |20   |0   |0       |0         |3       |0      |
|Portugal     |14             |0  |7      |0     |0       |0    |7    |0   |0       |0  

                                                                                