# Entregable # 1

El primer entregable para CODER HOUSE debe extraer datos de una API publica y crear una tabla en Redshift. Una vez creada la tabla, se debe cargar los datos en Redshift.

![arquitectura_propuesta](images/_home_marm1984_github_data_engineering_coder_house_entregable_uno_entrega_1_spark.png)

In [1]:
from colorama import Back, Fore, Style
import pandas as pd
import requests
import psycopg2
import random
import os

In [2]:
# Function that requests over HTTP a JSON object from a given URL

JSON = int | str | float | bool | None | dict[str, "JSON"] | list["JSON"]
JSONObject = dict[str, JSON]

def http_get_sync(url: str) -> JSONObject:
    """Synchronously performs an HTTP GET request and returns the JSON response."""
    try :
        print(Back.BLACK + Fore.CYAN + "GET: " + url + Style.RESET_ALL)
        return requests.get(url).json()
    except:
        print(Back.BLACK + Fore.RED + "ERROR: " + url + Style.RESET_ALL)
        return {}


In [3]:
# The limit parameters for the API calls

API_KEY = os.environ.get("NAPSTER_API_KEY")
OFFSET = 1000  # Number of artists to seed in db

In [4]:
# The URL for the API call

# artist_page_offset = 116 # johnny-cash 212 Bob Dylan
artist_page_offset = random.randint(0, OFFSET)
napster_url = f'https://napi-v2-2-cloud-run-b3gtd5nmxq-uw.a.run.app/v2.2/artists/top{API_KEY}&limit=1&offset={artist_page_offset}'

In [5]:
napster_url

'https://napi-v2-2-cloud-run-b3gtd5nmxq-uw.a.run.app/v2.2/artists/top?apikey=MjZkYmFhZTctMjFkZi00NjY3LWEwNGMtZDYzNmQ4YmM3OThi&limit=1&offset=80'

In [6]:
# Get the JSON object from the URL

napster_json = http_get_sync(napster_url)

[40m[36mGET: https://napi-v2-2-cloud-run-b3gtd5nmxq-uw.a.run.app/v2.2/artists/top?apikey=MjZkYmFhZTctMjFkZi00NjY3LWEwNGMtZDYzNmQ4YmM3OThi&limit=1&offset=80[0m


In [7]:
napster_json

{'artists': [{'type': 'artist',
   'id': 'art.12313694',
   'href': 'https://api.napster.com/v2.2/artists/art.12313694',
   'name': 'Tiësto & Junkie Xl',
   'shortcut': 'tiesto',
   'blurbs': [],
   'bios': [{'title': 'Napster',
     'author': 'Napster',
     'publishDate': '',
     'bio': "A favorite among Progressive Trance followers for his epic DJ sets and \r\nlarger-than-life original tunes, Holland-born DJ Tiesto rose to the \r\nforefront of the genre in record time. Not unlike contemporaries BT, Ferry \r\nCorsten, Paul Van Dyk and Sasha & Digweed (to name but a few), Tiesto's take \r\non Trance is anything but alienating. His own tracks are grandiose efforts \r\nfeaturing wispy female vocals embedded in a hypnotic atmosphere awash with \r\ndream-like synths."}],
   'albumGroups': {'compilations': ['alb.777630936',
     'alb.777629747',
     'alb.777618985',
     'alb.769898997',
     'alb.769872163',
     'alb.767591269',
     'alb.766713656',
     'alb.765441309',
     'alb.764

In [8]:
# Get all the keys inside artist object

for key in napster_json['artists'][0].keys():
    print(key)

type
id
href
name
shortcut
blurbs
bios
albumGroups
links


In [9]:
# Get all the keys inside artist object

for key in napster_json['artists'][0]['links'].keys():
    print(key)

albums
images
posts
topTracks
genres
stations
contemporaries
influences
relatedProjects
followers


In [10]:
# Get all the keys inside artist object

for key in napster_json['artists'][0]['links']['images'].keys():
    print(key)

href


# Load to Redshift

In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when

In [12]:
driver_postgres_path = "/home/marm1984/github/data_engineering_coder_house/entregable_uno/jar_files/postgresql-42.2.27.jre7.jar"
# os.environ["PYSPARK_SUBMIT_ARGS"] = f"--driver-class-path {driver_postgres_path} --jars {driver_postgres_path} pyspark-shell"
# os.environ["SPARK_CLASSPATH"] = driver_postgres_path

spark = SparkSession.builder \
    .master("local") \
    .appName("napster") \
    .config("spark.jars", driver_postgres_path) \
    .getOrCreate()
    

your 131072x1 screen size is bogus. expect trouble
23/06/08 21:36:52 WARN Utils: Your hostname, Geomario-Desktop resolves to a loopback address: 127.0.1.1; using 192.168.140.9 instead (on interface eth0)
23/06/08 21:36:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/06/08 21:36:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [13]:
spark

# Creating Table

The table for the artist, creates an Artist. Each artist has an ID, type, blurbs, name, hred and shorcut.

## Artist Table 

![Artist Table](images/artist_table.png)

# Data Structures

id = string  
type = string  
href = string  
name = string  
shortcut = string

# PostgreSQL

Create the table and perform the postgreSQL commands

In [14]:
# Import environment variables

host = os.environ['HOST']
port = os.environ['PORT']
user = os.environ['USER']
password = os.environ['PASSWORD']
database = os.environ['DATABASE']

In [15]:
# Create a connection to the database

try:
    conn = psycopg2.connect(
    host=host,
    port=port,
    database=database,
    user = user,
    password = password
    )
    print(Back.BLACK + Fore.GREEN + "SUCCESS: Connection to database" + Style.RESET_ALL)
except psycopg2.Error as e:
    print(Back.BLACK + Fore.RED + "ERROR: Connection to database" + Style.RESET_ALL)
    print(e)
    

[40m[32mSUCCESS: Connection to database[0m


In [16]:
# # Check the connection
personal_schema = os.environ['PERSONAL_SCHEMA']

cur = conn.cursor()
#  Check schema marm1984_coderhouse	
if cur:
    print("Connected to Redshift")
else:
    print("Connection failed")
 
#  Check schema exists from CODER
cur.execute(f"SELECT * FROM information_schema.tables WHERE table_schema = '{personal_schema}';")
print(cur.fetchall())


Connected to Redshift
[('data-engineer-database', 'marm1984_coderhouse', 'artist', 'BASE TABLE', None, None, None, None, None), ('data-engineer-database', 'marm1984_coderhouse', 'artist_napster', 'BASE TABLE', None, None, None, None, None)]


In [17]:
# Drop table if exists

cur.execute(f"DROP TABLE IF EXISTS {personal_schema}.artist_napster;")

In [18]:
# Create a table for artists

try:
    cur.execute(f"""
                CREATE TABLE IF NOT EXISTS {personal_schema}.artist_napster (
                    id VARCHAR(255) PRIMARY KEY,
                    name VARCHAR(255),
                    shortcut VARCHAR(255),
                    url VARCHAR(255),
                    type VARCHAR(255)
                );
                """)
    print(Back.BLACK + Fore.GREEN + "SUCCESS: Table created" + Style.RESET_ALL)
except:
    print(Back.BLACK + Fore.RED + "ERROR: Table not created" + Style.RESET_ALL)
    

[40m[32mSUCCESS: Table created[0m


In [19]:
napster_json['artists']

[{'type': 'artist',
  'id': 'art.12313694',
  'href': 'https://api.napster.com/v2.2/artists/art.12313694',
  'name': 'Tiësto & Junkie Xl',
  'shortcut': 'tiesto',
  'blurbs': [],
  'bios': [{'title': 'Napster',
    'author': 'Napster',
    'publishDate': '',
    'bio': "A favorite among Progressive Trance followers for his epic DJ sets and \r\nlarger-than-life original tunes, Holland-born DJ Tiesto rose to the \r\nforefront of the genre in record time. Not unlike contemporaries BT, Ferry \r\nCorsten, Paul Van Dyk and Sasha & Digweed (to name but a few), Tiesto's take \r\non Trance is anything but alienating. His own tracks are grandiose efforts \r\nfeaturing wispy female vocals embedded in a hypnotic atmosphere awash with \r\ndream-like synths."}],
  'albumGroups': {'compilations': ['alb.777630936',
    'alb.777629747',
    'alb.777618985',
    'alb.769898997',
    'alb.769872163',
    'alb.767591269',
    'alb.766713656',
    'alb.765441309',
    'alb.764441459',
    'alb.765798384',


# SPARK

In [24]:
# Create spark dataframe from JSON
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, BooleanType, ArrayType

schema = StructType([
    StructField("id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("shortcut", StringType(), True),
    StructField("url", StringType(), True),
    StructField("type", StringType(), True)
])

df_artist = spark.createDataFrame(napster_json['artists'] , schema=schema)

In [25]:
df_artist.show()

+------------+------------------+--------+----+------+
|          id|              name|shortcut| url|  type|
+------------+------------------+--------+----+------+
|art.12313694|Tiësto & Junkie Xl|  tiesto|null|artist|
+------------+------------------+--------+----+------+



                                                                                

In [26]:
# Drop columns is not necessary

columns_table = ['id', 'name', 'shortcut', 'href', 'type']

for column in df_artist.columns:
    if column not in columns_table:
        df_artist = df_artist.drop(column)

In [27]:
df_artist.show()

+------------+------------------+--------+------+
|          id|              name|shortcut|  type|
+------------+------------------+--------+------+
|art.12313694|Tiësto & Junkie Xl|  tiesto|artist|
+------------+------------------+--------+------+



In [28]:
df_artist.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- shortcut: string (nullable = true)
 |-- type: string (nullable = true)



In [29]:
df_artist.write \
    .format("jdbc") \
    .option("url", f"jdbc:postgresql://{host}:{port}/{database}") \
    .option("dbtable", f"{personal_schema}.artist_napster") \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "org.postgresql.Driver") \
    .mode("append") \
    .save()
    

23/06/08 21:39:51 WARN PgConnection: Unsupported Server Version: 8.0.2
23/06/08 21:40:40 WARN PgConnection: Unsupported Server Version: 8.0.2 + 1) / 1]
23/06/08 21:40:45 WARN PgConnection: Unsupported Server Version: 8.0.2          


In [30]:
# Query the table from Redshift with spark

df_artist = spark.read \
    .format("jdbc") \
    .option("url", f"jdbc:postgresql://{host}:{port}/{database}") \
    .option("dbtable", f"{personal_schema}.artist_napster") \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "org.postgresql.Driver") \
    .load()
    

23/06/08 21:44:50 WARN PgConnection: Unsupported Server Version: 8.0.2


In [31]:
df_artist.show()

23/06/08 21:45:01 WARN PgConnection: Unsupported Server Version: 8.0.2
[Stage 3:>                                                          (0 + 1) / 1]

+--------------------+------------+------------------+-----------+------+
|                href|          id|              name|   shortcut|  type|
+--------------------+------------+------------------+-----------+------+
|https://api.napst...|   art.43394|       Spice Girls|spice-girls|artist|
|                null|art.12313694|Tiësto & Junkie Xl|     tiesto|artist|
+--------------------+------------+------------------+-----------+------+



                                                                                