### MONGODB

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,explode

# Create a Spark session
spark = SparkSession.builder \
    .appName("MongoDBMflixAnalysis") \
    .config("spark.mongodb.input.uri", f"mongodb+srv://student:student@cluster0.koi0v.mongodb.net") \
    .config("spark.mongodb.output.uri", f"mongodb+srv://student:student@cluster0.koi0v.mongodb.net") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .getOrCreate()
sales_df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", "mongodb+srv://student:student@cluster0.koi0v.mongodb.net/sample_supplies.sales").load()

from pyspark.sql import functions as F

# Select relevant columns and explode the "items" array
customer_and_sales = sales_df.select(
    sales_df["storeLocation"].alias("CITY"), 
    sales_df["customer"],
    explode("items").alias("item")
)

# Calculate total sales by city
total_sales_by_city = customer_and_sales.groupBy("CITY").agg(
    F.sum(F.col("item.quantity")).alias("TOTAL_SALES")
)

# Calculate total distinct customers by city
total_customers_by_city = customer_and_sales.groupBy("CITY").agg(
    F.countDistinct("customer").alias("NUM_OF_CUSTOMERS")
)

# Join the two DataFrames on the "City" column
df_mongodb = total_sales_by_city.join(
    total_customers_by_city, 
    "CITY", 
    "inner"
).orderBy("TOTAL_SALES")

# Show the merged DataFrame
df_mongodb.show(truncate=False)

:: loading settings :: url = jar:file:/usr/local/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/azureuser/.ivy2/cache
The jars for the packages stored in: /home/azureuser/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-34a61a15-6653-40ee-9ede-e240317c1e65;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 in central
	found org.mongodb#mongodb-driver-sync;4.0.5 in central
	found org.mongodb#bson;4.0.5 in central
	found org.mongodb#mongodb-driver-core;4.0.5 in central
:: resolution report :: resolve 203ms :: artifacts dl 10ms
	:: modules in use:
	org.mongodb#bson;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-core;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-sync;4.0.5 from central in [default]
	org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   art

+---------+-----------+----------------+
|CITY     |TOTAL_SALES|NUM_OF_CUSTOMERS|
+---------+-----------+----------------+
|San Diego|7540       |346             |
|New York |11258      |501             |
|Austin   |15351      |676             |
|London   |17710      |794             |
|Seattle  |24853      |1134            |
|Denver   |34155      |1549            |
+---------+-----------+----------------+



### Hadoop

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Create a Spark session
spark = SparkSession.builder.appName("AdventureWorksAnalysis").getOrCreate()

# Set the Hadoop file path
hadoop_path = "hdfs://localhost:9000/user/input/adventureworks/"

# Read CSV files into PySpark DataFrames
customers = spark.read.csv(hadoop_path + "customers.csv", header=True, inferSchema=True)
employees = spark.read.csv(hadoop_path + "employees.csv", header=True, inferSchema=True)
orders = spark.read.csv(hadoop_path + "orders.csv", header=True, inferSchema=True)
product_categories = spark.read.csv(hadoop_path + "productcategories.csv", header=True, inferSchema=True)
products = spark.read.csv(hadoop_path + "products.csv", header=True, inferSchema=True)
product_subcategories = spark.read.csv(hadoop_path + "productsubcategories.csv", header=True, inferSchema=True)
vendor_product = spark.read.csv(hadoop_path + "vendorproduct.csv", header=True, inferSchema=True)
vendors = spark.read.csv(hadoop_path + "vendors.csv", header=True, inferSchema=True)

joined_df = orders.join(employees, orders["EmployeeID"] == employees["EmployeeID"], "inner")

# Perform the first aggregation
total_sales = joined_df.groupBy("Territory") \
                       .agg(sum("LineTotal").alias("TOTAL_SALES")) \
                       .withColumnRenamed("Territory", "CITY")

# Perform the second aggregation
distinct_customers = joined_df.groupBy("Territory") \
                              .agg(countDistinct("CustomerID").alias("NUM_OF_CUSTOMERS")) \
                              .withColumnRenamed("Territory", "CITY")

# Perform the third aggregation
#total_revenue = joined_df.groupBy("Territory") \
#                         .agg((sum("LineTotal") - sum("UnitPrice")).alias("Total_Revenue")) \
#                         .withColumnRenamed("Territory", "city")

# Merge the DataFrames together
result_CSV = total_sales.join(distinct_customers, "CITY") \
                    .orderBy("TOTAL_SALES")
                    #.join(total_revenue, "city") \
                    

# Show the result
result_CSV.show()


24/02/20 01:32:38 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

+--------------+--------------------+----------------+
|          CITY|         TOTAL_SALES|NUM_OF_CUSTOMERS|
+--------------+--------------------+----------------+
|     Australia|   1421810.923837996|              33|
|       Germany|  1827066.7117509968|              32|
|          NULL|  1997407.4910679972|              87|
|        France|   4509888.930791994|              34|
|     Southeast|   7171012.749148975|              74|
|United Kingdom|   8503338.645407949|              62|
|     Northeast|   9293903.004376972|             118|
|     Northwest|   9367593.632682951|              71|
|        Canada|   9535865.570011947|             105|
|       Central|1.0065803540256996E7|             121|
|     Southwest| 1.679401297999503E7|             104|
+--------------+--------------------+----------------+



### MARIADB

In [3]:
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

appName = "PySpark Example"
master = "local"
# Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()


server = "localhost"
port = 3306
database = "classicmodels"
jdbc_url = f"jdbc:mysql://{server}:{port}/{database}?permitMysqlScheme"


user = "haunguyen2"
password = "haunguyenpwd"
jdbc_driver = "org.mariadb.jdbc.Driver"

properties = {
    "user": user,
    "password": password,
    "driver": jdbc_driver
}

customers_df = spark.read.jdbc(jdbc_url, "(select * from customers) tab", properties=properties)
employees_df = spark.read.jdbc(jdbc_url, "(select * from employees) tab", properties=properties)
offices_df = spark.read.jdbc(jdbc_url, "(select * from offices) tab", properties=properties)
orderdetails_df = spark.read.jdbc(jdbc_url, "(select * from orderdetails) tab", properties=properties)
orders_df = spark.read.jdbc(jdbc_url, "(select * from orders) tab", properties=properties)
payments_df = spark.read.jdbc(jdbc_url, "(select * from payments) tab", properties=properties)
productlines_df = spark.read.jdbc(jdbc_url, "(select * from productlines) tab", properties=properties)
products_df = spark.read.jdbc(jdbc_url, "(select * from products) tab", properties=properties)

report_Maria = (
    customers_df.alias("customers")
    .join(orders_df.alias("orders"), "customerNumber", "left")
    .join(orderdetails_df.alias("order_details"), "orderNumber", "left")
    .join(payments_df.alias("payments"), "customerNumber", "left")
    .groupBy("customers.city")
    .agg(
        countDistinct("customers.customerNumber").alias("NUM_OF_CUSTOMERS"),
        sum(expr("order_details.priceEach * order_details.quantityOrdered")).alias("TOTAL_SALES")
    )
    .withColumnRenamed("customers.city", "CITY")
    .orderBy("TOTAL_SALES", ascending=False)
)

report_Maria.show()

24/02/20 01:32:48 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+-------------+----------------+-----------+
|         city|NUM_OF_CUSTOMERS|TOTAL_SALES|
+-------------+----------------+-----------+
|       Madrid|               5|11099786.57|
|   San Rafael|               1| 5326446.06|
|          NYC|               5| 1642508.02|
|   Auckland  |               2| 1030870.69|
|    Singapore|               3|  948244.37|
|    Melbourne|               1|  722340.28|
|        Paris|               3|  721949.04|
|  New Bedford|               2|  571500.03|
| North Sydney|               1|  548136.88|
|       Nantes|               2|  542662.44|
|    Kobenhavn|               1|  516340.48|
|        Reims|               1|  507932.76|
|San Francisco|               2|  464813.24|
|   Manchester|               1|  445230.27|
|    Minato-ku|               1|  422194.92|
|   Burlingame|               1|  418180.88|
|      Stavern|               1|  416899.16|
|     Salzburg|               1|  412440.21|
|    Chatswood|               1|  401721.36|
|   Brickh

### SNOWFLAKE

In [4]:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Snowflake Query") \
    .config("spark.jars", "/usr/local/spark/jars/snowflake-jdbc-3.14.5.jar,/usr/local/spark/jars/spark-snowflake_2.13-2.14.0-spark_3.4.jar") \
    .getOrCreate()

# Set Snowflake connection properties
sfOptions = {
  "sfURL" : "https://tltmqjz-yk82271.snowflakecomputing.com",
  "sfAccount" : "YK82271",
  "sfUser" : "HOWARDNGUYEN29",
  "sfPassword" : "password", #oi29
  "sfDatabase" : "GBCTRAINING3",
  "sfSchema" : "PUBLIC",
  "sfWarehouse" : "COMPUTE_WH",
}

# Define the query
query = """
    SELECT City, Num_of_customers, total_sales
    FROM (
        SELECT City, COUNT(DISTINCT Customers) AS Num_of_customers, SUM(quantity * price) AS total_sales
        FROM (
            SELECT 
                SAMPLE_SUPPLIES.STORELOCATION AS City,
                SAMPLE_SUPPLIES.CUSTOMER AS Customers,
                TO_NUMBER(GET(GET(ITEM.VALUE, 'quantity'), '$numberInt')) AS quantity,
                TO_NUMBER(GET(GET(ITEM.VALUE, 'price'), '$numberDecimal')) AS price
            FROM
                SAMPLE_SUPPLIES,
            LATERAL FLATTEN(INPUT => PARSE_JSON(SAMPLE_SUPPLIES.items)) AS item
        ) AS subquery1
        GROUP BY City

        UNION

        SELECT city, count(CUSTOMER_ID) AS Num_of_customers, SUM(sales) AS total_sales
        FROM SUPERSTORE
        GROUP BY city
    ) AS combined_data
    ORDER BY total_sales;
"""

# Load data from Snowflake using the defined query
df = spark.read.format("snowflake") \
    .options(**sfOptions) \
    .option("query", query) \
    .load()

# Show the result
df.show()



24/02/20 01:32:59 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+-----------------+----------------+-----------+
|             CITY|NUM_OF_CUSTOMERS|TOTAL_SALES|
+-----------------+----------------+-----------+
|          Abilene|               1|      1.392|
|           Elyria|               1|      1.824|
|          Jupiter|               1|      2.064|
|        Pensacola|               1|      2.214|
|     Ormond Beach|               1|      2.808|
|  San Luis Obispo|               1|       3.62|
|       Springdale|               1|        4.3|
|           Layton|               1|       4.96|
|           Keller|               1|          6|
|    Missouri City|               1|       6.37|
|        Deer Park|               1|      6.924|
|      Port Orange|               1|      7.824|
|         Billings|               1|      8.288|
|       Romeoville|               1|      8.952|
|        Iowa City|               1|       9.99|
|          Baytown|               1|     10.368|
|        Rock Hill|               1|      11.85|
|      Chapel Hill| 

### MERGING

In [14]:
merged_df = df.union(report_Maria).union(df).union(df_mongodb)
# merged_df = merged_df.withColumn("TOTAL_SALES", format_number("TOTAL_SALES", 2))

# Filter out rows with null values in the TOTAL_SALES column
merged_df = merged_df.filter(merged_df.TOTAL_SALES.isNotNull())

# Keep only distinct cities
merged_df = merged_df.dropDuplicates(["CITY"])

# Show the merged DataFrames
merged_df = merged_df.orderBy("TOTAL_SALES")
merged_df.show()

24/02/20 01:43:57 WARN SnowflakeStrategy: Pushdown failed :null
24/02/20 01:43:57 WARN SnowflakeStrategy: Pushdown failed :null
24/02/20 01:43:57 WARN SnowflakeStrategy: Pushdown failed :null
24/02/20 01:43:59 WARN SnowflakeStrategy: Pushdown failed :null
24/02/20 01:43:59 WARN SnowflakeStrategy: Pushdown failed :null
24/02/20 01:43:59 WARN SnowflakeStrategy: Pushdown failed :null
24/02/20 01:44:02 WARN SnowflakeStrategy: Pushdown failed :null     (0 + 1) / 1]
24/02/20 01:44:02 WARN SnowflakeStrategy: Pushdown failed :null
24/02/20 01:44:02 WARN SnowflakeStrategy: Pushdown failed :null
24/02/20 01:44:04 WARN SnowflakeStrategy: Pushdown failed :null                 
24/02/20 01:44:04 WARN SnowflakeStrategy: Pushdown failed :null
24/02/20 01:44:04 WARN SnowflakeStrategy: Pushdown failed :null
24/02/20 01:44:06 WARN SnowflakeStrategy: Pushdown failed :null
24/02/20 01:44:06 WARN SnowflakeStrategy: Pushdown failed :null
24/02/20 01:44:06 WARN SnowflakeStrategy: Pushdown failed :null
24/02/

+-----------------+----------------+-----------+
|             CITY|NUM_OF_CUSTOMERS|TOTAL_SALES|
+-----------------+----------------+-----------+
|          Abilene|               1|      1.392|
|           Elyria|               1|      1.824|
|          Jupiter|               1|      2.064|
|        Pensacola|               1|      2.214|
|     Ormond Beach|               1|      2.808|
|  San Luis Obispo|               1|       3.62|
|       Springdale|               1|        4.3|
|           Layton|               1|       4.96|
|           Keller|               1|        6.0|
|    Missouri City|               1|       6.37|
|        Deer Park|               1|      6.924|
|      Port Orange|               1|      7.824|
|         Billings|               1|      8.288|
|       Romeoville|               1|      8.952|
|        Iowa City|               1|       9.99|
|          Baytown|               1|     10.368|
|        Rock Hill|               1|      11.85|
|      Chapel Hill| 

## Distribute merged outcomes into SSMS

In [10]:
!pip install sqlalchemy
!pip install pyodbc

Defaulting to user installation because normal site-packages is not writeable
Collecting pyodbc
  Downloading pyodbc-5.1.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (2.7 kB)
Downloading pyodbc-5.1.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (334 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m334.7/334.7 kB[0m [31m6.6 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: pyodbc
Successfully installed pyodbc-5.1.0


### Using ODBC (instead of JBDC)
### Should install ODBC Driver Manager on linux by sudo apt update
### Should install ODBC+Driver+18+for+SQL+Server by https://learn.microsoft.com/en-us/sql/connect/odbc/linux-mac/installing-the-microsoft-odbc-driver-for-sql-server?view=sql-server-ver16&tabs=alpine18-install%2Calpine17-install%2Cdebian8-install%2Credhat7-13-install%2Crhel7-offline (choos Ubuntu)

In [21]:
import pandas as pd
import sqlalchemy

# Collect data from Spark DataFrame to the driver node
merged_df_collected = merged_df.collect()

# Convert collected data to Pandas DataFrame
pandas_df = pd.DataFrame(merged_df_collected, columns=merged_df.columns)

# Define database connection string
connection_string = 'mssql+pyodbc://azureuser:password@howardserver.database.windows.net/myfirstdatabase?driver=ODBC+Driver+18+for+SQL+Server'

# Create SQLAlchemy engine
engine = sqlalchemy.create_engine(connection_string)

# Connect to the database
connection = engine.connect()

# Define the table name
table_name = 'SaleOnCity'

# Create a table in the database based on the DataFrame schema
pandas_df.to_sql(table_name, connection, if_exists='replace', index=False)

# Close the database connection
connection.close()