In [26]:
import pyspark
import pandas as pd
from pyspark.sql import SparkSession
import pyspark.sql.functions as sf      # sf = spark functions
import pyspark.sql.types as st          # st = spark types
from google.cloud import bigquery
from google.oauth2 import service_account
from google.cloud.exceptions import NotFound
import os

sparkql = pyspark.sql.SparkSession.builder.master('local').getOrCreate()

data_dir = '../data'

file_names = ['ADBE','AMZN', 'CRM', 'CSCO', 'GOOGL', 'IBM','INTC','META','MSFT','NFLX','NVDA','ORCL','TSLA'] #excluded AAPL to start df

columns = 'stock_name string, date string, open float, high float, low float, close float, adj_close float, volume int' #schema to use

df = sparkql.read.csv(os.path.join(data_dir,'AAPL.csv'), header=True)
df = df.toDF('date', 'open', 'high', 'low', 'close', 'adj_close', 'volume') #rename the columns
df = df.withColumn('stock_name', sf.lit('AAPL')) #add column with stock name

#create composite key
df.createOrReplaceTempView("key") 
df = sparkql.sql("SELECT CONCAT(stock_name, date) AS sd_id, stock_name, date, open, high, low, close, adj_close, volume FROM key")
df = df.select('sd_id','stock_name', 'date', 'open', 'high', 'low', 'close', 'adj_close', 'volume')

#create df for each csv, transform it and then consolidate into one dataframe
for csv in file_names:
    idf = sparkql.read.csv(os.path.join(data_dir,csv+'.csv'), header=True)
    idf = idf.toDF('date', 'open', 'high', 'low', 'close', 'adj_close', 'volume') #rename the columns
    idf = idf.withColumn('stock_name', sf.lit(csv)) #add column with stock name

    #create composite key
    idf.createOrReplaceTempView("key") 
    idf = sparkql.sql("SELECT CONCAT(stock_name, date) AS sd_id, stock_name, date, open, high, low, close, adj_close, volume FROM key")
    idf = idf.select('sd_id','stock_name', 'date', 'open', 'high', 'low', 'close', 'adj_close', 'volume')
    
    #concat to df
    df = df.union(idf)

# Write to parquet file. Used coalesce in order to have one parquet file
df.coalesce(1).write.format("parquet").save(os.path.join(data_dir,'all_tech_stocks.parquet'))

In [27]:
PROJECT_NAME = 'team-week-3'
DATASET_NAME = 'tech_stocks_world_events'

key_path = "/Users/Ruben/Desktop/google_cred/.cred/team_project_3/team-week-3-2f1d10dceea4.json"

credentials = service_account.Credentials.from_service_account_file(
    key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"],
)

#create bigquery client
client = bigquery.Client(credentials=credentials, project=credentials.project_id)

#create dataset_id and table_ids
dataset_id = f"{PROJECT_NAME}.{DATASET_NAME}"
table_id = f"{PROJECT_NAME}.{DATASET_NAME}.stocks"

data_dir = '../data/all_tech_stocks.parquet'

parq = '.snappy.parquet'
for file_name in os.listdir(data_dir):
    source = data_dir + file_name
    if parq in source and '.crc' not in source:
        os.rename(os.path.join(data_dir,file_name),os.path.join(data_dir,'stocks.parquet'))
    

DATA_FILE = os.path.join(data_dir,'stocks.parquet')

TABLE_SCHEMA = [
    bigquery.SchemaField('sd_id', 'STRING', mode='REQUIRED'),
    bigquery.SchemaField('stock_name', 'STRING', mode='NULLABLE'),
    bigquery.SchemaField('date', 'DATE', mode='NULLABLE'),
    bigquery.SchemaField('open', 'FLOAT', mode='NULLABLE'),
    bigquery.SchemaField('high', 'FLOAT', mode='NULLABLE'),
    bigquery.SchemaField('low', 'FLOAT', mode='NULLABLE'),
    bigquery.SchemaField('close', 'FLOAT', mode='NULLABLE'),
    bigquery.SchemaField('adj_close', 'STRING', mode='NULLABLE'),
    bigquery.SchemaField('volume', 'INTEGER', mode='NULLABLE'),
    ]

def create_dataset():
    if client.get_dataset(dataset_id) == NotFound:
        dataset = bigquery.Dataset(dataset_id)
        dataset.location = "US"
        dataset = client.create_dataset(dataset, exists_ok=True)
    else:
        pass

def create_stocks_table():
    job_config = bigquery.LoadJobConfig(
            source_format=bigquery.SourceFormat.PARQUET,
            autodetect=True,
            create_disposition='CREATE_NEVER',
            write_disposition='WRITE_TRUNCATE',
            ignore_unknown_values=True,
        )
    table = bigquery.Table(table_id, schema=TABLE_SCHEMA)
    table = client.create_table(table, exists_ok=True)

    with open(DATA_FILE, "rb") as source_file:
        job = client.load_table_from_file(source_file, table_id, job_config=job_config)

    job.result()

create_dataset()
create_stocks_table()

In [183]:
import yaml
from airflow.models import Variable
from airflow.hooks.filesystem import FSHook

_default_config_path = './config.yml'
CONF_PATH = Variable.get('config_file', default_var=_default_config_path)
config: dict = {}
with open(CONF_PATH) as open_yaml:
    config: dict =  yaml.full_load(open_yaml)

[[34m2023-02-14 10:36:12,691[0m] {[34mvariable.py:[0m272} ERROR[0m - Unable to retrieve variable from secrets backend (MetastoreBackend). Checking subsequent secrets backend.[0m
Traceback (most recent call last):
  File "/Users/Ruben/Desktop/team-week3/venv/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1706, in _execute_context
    cursor, statement, parameters, context
  File "/Users/Ruben/Desktop/team-week3/venv/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 716, in do_execute
    cursor.execute(statement, parameters)
sqlite3.OperationalError: no such table: variable

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/Ruben/Desktop/team-week3/venv/lib/python3.7/site-packages/airflow/models/variable.py", line 265, in get_variable_from_secrets
    var_val = secrets_backend.get_variable(key=key)
  File "/Users/Ruben/Desktop/team-week3/venv/lib/python3.7/site-packages/airflow/utils/

In [188]:
data_dir = '../data/all_tech_stocks.parquet'
filepath = '*.snappy.parquet'
test = sparkql.read.parquet(os.path.join(data_dir,filepath))

test.show()

+--------------+----------+----------+--------+--------+--------+--------+---------+---------+
|         sd_id|stock_name|      date|    open|    high|     low|   close|adj_close|   volume|
+--------------+----------+----------+--------+--------+--------+--------+---------+---------+
|TSLA2010-06-30|      TSLA|2010-06-30|1.719333|2.028000|1.553333|1.588667| 1.588667|257806500|
|TSLA2010-07-01|      TSLA|2010-07-01|1.666667|1.728000|1.351333|1.464000| 1.464000|123282000|
|TSLA2010-07-02|      TSLA|2010-07-02|1.533333|1.540000|1.247333|1.280000| 1.280000| 77097000|
|TSLA2010-07-06|      TSLA|2010-07-06|1.333333|1.333333|1.055333|1.074000| 1.074000|103003500|
|TSLA2010-07-07|      TSLA|2010-07-07|1.093333|1.108667|0.998667|1.053333| 1.053333|103825500|
|TSLA2010-07-08|      TSLA|2010-07-08|1.076000|1.168000|1.038000|1.164000| 1.164000|115671000|
|TSLA2010-07-09|      TSLA|2010-07-09|1.172000|1.193333|1.103333|1.160000| 1.160000| 60759000|
|TSLA2010-07-12|      TSLA|2010-07-12|1.196667|1.2