In [1]:
import os
spark_version = 'spark-3.1.2'
os.environ['SPARK_VERSION']=spark_version
os.environ["JAVA_HOME"] = "C:\Program Files\Java\jdk-11.0.11"
os.environ["SPARK_HOME"] = f"C:\spark\spark-3.1.2-bin-hadoop3.2"
import findspark
import pyspark
from pyspark.sql import SparkSession
findspark.init()
spark = SparkSession.builder.appName("emissionsdataframe").getOrCreate()
from sqlalchemy import create_engine, insert
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import MetaData, update, Table
from sqlalchemy.orm import Session
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import SQLAlchemyError
import pandas as pd
import config
from pyspark.sql.types import IntegerType,BooleanType,DateType
from pyspark.sql.functions import col
from pyspark.sql import Row


In [2]:
rds_string = config.rds_string
engine = create_engine(f'postgresql://{rds_string}')
conn = engine.connect()
metadata = MetaData(engine)

In [3]:
def sql_to_spark(table):
    from pyspark.sql import Row
    from sqlalchemy.orm import Session, sessionmaker
    from sqlalchemy import MetaData, Table
    metadata = MetaData(engine)
    table_var = Table(table, metadata, autoload=True, autoload_with=engine)
    Session = sessionmaker()
    Session.configure(bind=engine)
    session = Session()
    query = session.query(table_var).all()
    steve = table_var.metadata.tables[table].columns.keys()
    query_list = []
    for i in query:
        q_len = len(i)
        temp_dict = {}
        for j in range(q_len):
            key = steve[j]
            value = i[j]
            if value == None:
                value = float(0)
            temp_dict[key] = value
            if j == (q_len - 1):
                query_list.append(temp_dict)
    df = spark.createDataFrame(Row(**x) for x in query_list)
    return df

def query_maker(table):
    from sqlalchemy.orm import Session, sessionmaker
    from sqlalchemy import MetaData, Table
    metadata = MetaData(engine)
    table_var = Table(table, metadata, autoload=True, autoload_with=engine)
    Session = sessionmaker()
    Session.configure(bind=engine)
    session = Session()
    query = session.query(table_var).all()
    steve = table_var.metadata.tables[table].columns.keys()
    return (query, steve)

def spark_maker(query,country):
    c_list = []
    for i in query:
        q_len = len(i)
        if i[0] == country:
            temp_dict = {}
            for j in range(q_len):
                key = steve[j]
                value = i[j]
                if value == None:
                    value = float(0)
                temp_dict[key] = value
                if j == (q_len - 1):
                    c_list.append(temp_dict)
    df = spark.createDataFrame(Row(**x) for x in c_list) 
    return df

def sql_spark_trade(table, country):
    from sqlalchemy.orm import Session, sessionmaker
    from sqlalchemy import MetaData, Table
    metadata = MetaData(engine)
    table_var = Table(table, metadata, autoload=True, autoload_with=engine)
    Session = sessionmaker()
    Session.configure(bind=engine)
    session = Session()
    query = session.query(table_var).filter(table_var.c.country_or_area == country).all()
    steve = table_var.metadata.tables[table].columns.keys()
    c_list = []
    for i in query:
        q_len = len(i)
        if i[0] == country:
            temp_dict = {}
            for j in range(q_len):
                key = steve[j]
                value = i[j]
                if value == None:
                    value = float(0)
                temp_dict[key] = value
                if j == (q_len - 1):
                    c_list.append(temp_dict)
    df = spark.createDataFrame(Row(**x) for x in c_list)
    return df

In [4]:
from sqlalchemy import MetaData, Table
metadata = MetaData(engine)
global_trade = Table("global_trade", metadata, autoload=True, autoload_with=engine)
trade_schema = global_trade.metadata.tables["global_trade"].columns.keys()
trade_schema

In [14]:
india_trade_spark = sql_spark_trade("global_trade","India")


In [17]:
india_trade_spark.show(truncate=True)

+---------------+----+---------+--------------------+------+---------+-----------+-------------------+-----------+--------------------+
|country_or_area|year|comm_code|           commodity|  flow|trade_usd|  weight_kg|      quantity_name|   quantity|            category|
+---------------+----+---------+--------------------+------+---------+-----------+-------------------+-----------+--------------------+
|          India|2016|   210110|Coffee extracts, ...|Import| 13371521|  1881816.0|Weight in kilograms|  1881816.0|21_miscellaneous_...|
|          India|2016|   210110|Coffee extracts, ...|Export|276962932|4.0752197E7|Weight in kilograms|4.0752197E7|21_miscellaneous_...|
|          India|2016|   210120|Tea and mate extr...|Import|  1988772|   105363.0|Weight in kilograms|   105363.0|21_miscellaneous_...|
|          India|2016|   210120|Tea and mate extr...|Export| 43467004|  6351410.0|Weight in kilograms|  6351410.0|21_miscellaneous_...|
|          India|2016|   210130|Chicory & other 

In [None]:
india_trade_spark.filter(india_trade_spark["Country "] == "Afghanistan").select(['Country ']).show()

In [None]:
sparkDF.select(['Country ']).show()

In [27]:
spark_gdp=sql_to_spark('gdp_data')

In [28]:
spark_india=sql_to_spark("india_export_data")

In [29]:
spark_emissions=sql_to_spark('global_emissions')

In [18]:
india_trade_spark_y = india_trade_spark.withColumn("year",india_trade_spark.year.cast('int'))

In [19]:
india_trade_spark_y.printSchema()

root
 |-- country_or_area: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- comm_code: string (nullable = true)
 |-- commodity: string (nullable = true)
 |-- flow: string (nullable = true)
 |-- trade_usd: long (nullable = true)
 |-- weight_kg: double (nullable = true)
 |-- quantity_name: string (nullable = true)
 |-- quantity: double (nullable = true)
 |-- category: string (nullable = true)



In [20]:
india_trade_spark_y.orderBy(india_trade_spark_y.year.asc()).show(1)

+---------------+----+---------+--------------------+------+---------+---------+-------------------+--------+--------------------+
|country_or_area|year|comm_code|           commodity|  flow|trade_usd|weight_kg|      quantity_name|quantity|            category|
+---------------+----+---------+--------------------+------+---------+---------+-------------------+--------+--------------------+
|          India|1988|   420100|Saddlery and harn...|Import|      151|     11.0|Weight in kilograms|    11.0|42_articles_of_le...|
+---------------+----+---------+--------------------+------+---------+---------+-------------------+--------+--------------------+
only showing top 1 row



In [21]:
india_export_df = india_trade_spark_y.filter(india_trade_spark_y['flow'] == "Export")

In [23]:
india_import_df = india_trade_spark_y.filter(india_trade_spark_y['flow'] == "Import")

In [24]:
from pyspark.sql.functions import array, col, explode, lit, struct
from pyspark.sql import DataFrame
from typing import Iterable

In [25]:


def melt(
        df: DataFrame, 
        id_vars: Iterable[str], value_vars: Iterable[str], 
        var_name: str="variable", value_name: str="value") -> DataFrame:
    """Convert :class:`DataFrame` from wide to long format."""

    # Create array<struct<variable: str, value: ...>>
    _vars_and_vals = array(*(
        struct(lit(c).alias(var_name), col(c).alias(value_name)) 
        for c in value_vars))

    # Add to the DataFrame and explode
    _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))

    cols = id_vars + [
            col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
    return _tmp.select(*cols)

In [30]:
spark_gdp.show()

+--------------------+------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+----+
|            Country |Country Code|       1990|       1991|       1992|       1993|       1994|       1995|       1996|       1997|       1998|       1999|       2000|       2001|       2002|       2003|       2004|       2005|       2006|       2007|       2008|       2009|       2010|       2011|       2012|       2013|       2014|       2015|       2016|       2017|       2018|2019|
+--------------------+------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-------