In [1]:
import pandas as pd
import os
import json
from google.oauth2 import service_account
from google.cloud import bigquery
import pyarrow

In [2]:
# Obtain the key from the environment variable
service_account_key =os.getenv('GCP_ACCESS_KEY')
if not service_account_key:
    raise ValueError("The GCP_SERVICE_ACCOUNT_KEY environment variable is not set")

key_data = json.loads(service_account_key)

credentials = service_account.Credentials.from_service_account_info(
    key_data,
    scopes=["https://www.googleapis.com/auth/cloud-platform"],
)

In [3]:
project_id = os.getenv('GOOGLE_PROJECT_MAIN_FP')

In [4]:
client = bigquery.Client(credentials=credentials, project=credentials.project_id)

In [5]:
df = pd.read_csv('/workspaces/apache-airflow-project/Airflow_Basic/Back_Fill/dataset/AAPL.csv')

In [6]:
df.head()

Unnamed: 0,Date,Open,High,Low,Close,Adj Close,Volume
0,2022-12-27,131.380005,131.410004,128.720001,130.029999,129.307236,69007800
1,2022-12-28,129.669998,131.029999,125.870003,126.040001,125.339417,85438400
2,2022-12-29,127.989998,130.479996,127.730003,129.610001,128.889557,75703700
3,2022-12-30,128.410004,129.949997,127.43,129.929993,129.207794,77034200
4,2023-01-03,130.279999,130.899994,124.169998,125.07,124.374802,112117500


In [7]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 250 entries, 0 to 249
Data columns (total 7 columns):
 #   Column     Non-Null Count  Dtype  
---  ------     --------------  -----  
 0   Date       250 non-null    object 
 1   Open       250 non-null    float64
 2   High       250 non-null    float64
 3   Low        250 non-null    float64
 4   Close      250 non-null    float64
 5   Adj Close  250 non-null    float64
 6   Volume     250 non-null    int64  
dtypes: float64(5), int64(1), object(1)
memory usage: 13.8+ KB


In [8]:
df['Date'] = pd.to_datetime(df['Date'])

In [10]:
df.rename(columns={'Adj Close': 'Adj_Close'}, inplace=True)

In [11]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 250 entries, 0 to 249
Data columns (total 7 columns):
 #   Column     Non-Null Count  Dtype         
---  ------     --------------  -----         
 0   Date       250 non-null    datetime64[ns]
 1   Open       250 non-null    float64       
 2   High       250 non-null    float64       
 3   Low        250 non-null    float64       
 4   Close      250 non-null    float64       
 5   Adj_Close  250 non-null    float64       
 6   Volume     250 non-null    int64         
dtypes: datetime64[ns](1), float64(5), int64(1)
memory usage: 13.8 KB


In [21]:
def pd_data_load(file_path):
    df=pd.read_csv(file_path)
    df['Date'] = pd.to_datetime(df['Date'])
    df.rename(columns={'Adj Close': 'Adj_Close'}, inplace=True)
    print(df.head())
    return df


In [25]:
bq_table_name =  project_id+'.test_table.stock_price_data'

In [26]:
bq_table_name

'friendly-plane-294914.test_table.stock_price_data'

In [29]:
def bigquery_load(table_name, df,date_period):
    table_name_bq = table_name +'_'+ str(date_period)
    job_config = bigquery.LoadJobConfig(
        schema=[
            bigquery.SchemaField("Date", "DATE"),
            bigquery.SchemaField("Open", "FLOAT"),
            bigquery.SchemaField("High", "FLOAT"),
            bigquery.SchemaField("Low", "FLOAT"),
            bigquery.SchemaField("Close", "FLOAT"),
            bigquery.SchemaField("Adj_Close", "FLOAT"),
            bigquery.SchemaField("Volume", "INTEGER"),
        ],
        write_disposition="WRITE_TRUNCATE",
    )

    job = client.load_table_from_dataframe(df, table_name_bq, job_config=job_config)
    job.result()



In [31]:

bigquery_load(bq_table_name, df,'20200102')