In [116]:
import pandas as pd
import json


In [168]:
project_id="is3107-news"

TableId_FilePath_dict={
    "Tickers":"../company_news/tickers.json",
    "Company":"../Company/Company.csv",
    "FinanceSituation":"../Finance_situation/Finance_situation.csv",
    "StockData":"../stock_price/stock_price.csv",
    "News":"../company_news/news2.csv",
}

# file path after transfering process
# News and FinanceSituation are transfered
TableId_FilePath_dict_trans={
    "Tickers":"../company_news/tickers.csv",
    "Company":"../Company/Company.csv",
    "FinanceSituation":"../Finance_situation/Finance_situation_trans.csv",
    "StockData":"../stock_price/stock_price.csv",
    "News":"../company_news/news2_trans.csv",
}

TableId_PrimaryKey_dict={
    "Tickers":["ticker","news_id"],
    "Company":["id"],
    "FinanceSituation":["company_id"],
    "StockData":["Ticker","Date"],
    "News":["id"],
}

build stock-news relation table

In [118]:
relation_columns=["ticker","news_id"]
# df_tickers.head()
rows=[]
with open(TableId_FilePath_dict["Tickers"], 'r') as file:
    data = json.load(file)
    # print(data)
    
    for stock,news_list in data.items():
        for news_id in news_list:
            rows.append([stock,news_id])
df_relation=pd.DataFrame(rows,columns=relation_columns)
df_relation.to_csv(TableId_FilePath_dict_trans["Tickers"],index=False)

In [119]:
df_relation

Unnamed: 0,ticker,news_id
0,GOOGL,175557073
1,GOOGL,175592149
2,GOOGL,175725585
3,GOOGL,175559319
4,GOOGL,175585505
...,...,...
36895,NRG,175582467
36896,NRG,175577659
36897,NRG,175585095
36898,NRG,175577749


data transfering

In [120]:
#remove the index column in stock_price.csv
df_stock_price=pd.read_csv(TableId_FilePath_dict["StockData"],usecols=["Ticker","Date","log_return"])
df_stock_price.to_csv(TableId_FilePath_dict_trans["StockData"],index=False)

In [121]:
# news csv
# filter chinese character
df_news=pd.read_csv(TableId_FilePath_dict["News"])

#time stamp to data time
df_news["publish_date"]=pd.to_datetime(df_news["publish_date"])
df_news["publish_date"]=df_news["publish_date"].dt.date
df_news.to_csv(TableId_FilePath_dict_trans["News"],index=False)

def contains_chinese(text):
    for char in text:
        if char>='\u4e00' and char<='\u9fff':
            return True
    return False

df_filter_title=df_news[~df_news["title"].apply(contains_chinese)]
df_filter_text=df_news[~df_news["text"].apply(contains_chinese)]
df_filter_text=df_filter_text.drop_duplicates(subset='id',keep="first")
df_filter_text.to_csv(TableId_FilePath_dict_trans["News"],index=False)


In [122]:
df_filter_text.shape

(97, 11)

In [123]:
#finance_situation.csv
df_fin=pd.read_csv(TableId_FilePath_dict["FinanceSituation"])
df_fin["debtToEquity"]/=100
df_fin["debtToEquity"].fillna(0,inplace=True)
df_fin.to_csv(TableId_FilePath_dict_trans["FinanceSituation"],index=False)

Load data to cloud

In [124]:
from google.cloud import bigquery

client = bigquery.Client()

dataset_id = 'raw_data'
dataset_ref = client.dataset(dataset_id)

In [None]:
def merge_data(tmp_table_id,target_table_id):

    primarykey_list=TableId_PrimaryKey_dict[target_table_id.split('.')[-1]]
    
    if len(primarykey_list)==1:
        primarykey_statement=f"target.{primarykey_list[0]} = source.{primarykey_list[0]}"
    elif len(primarykey_list)==2:
        primarykey_statement=f"target.{primarykey_list[0]} = source.{primarykey_list[0]} "+\
            f" AND target.{primarykey_list[1]} = source.{primarykey_list[1]}"
        
    
    merge_statement=f"""
    MERGE `{target_table_id}` AS target
    USING `{tmp_table_id}` AS source
    ON ({primarykey_statement})
    WHEN NOT MATCHED THEN
        INSERT ROW
    """
    
    
    query_job = client.query(merge_statement)

    # 等待任务完成
    query_job.result()

def load_data(table_id,file_path):
    # this is autodectect schema mode
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.CSV, skip_leading_rows=1, autodetect=True,
    )
    
    #upload the tmp table
    tmp_table_id=table_id+"_tmp"
    with open(file_path, "rb") as source_file:
        job = client.load_table_from_file(source_file, tmp_table_id, job_config=job_config)
    job.result()  # Waits for the job to complete.
    
    merge_data(tmp_table_id,table_id)
    # drop the tmp table
    client.delete_table(tmp_table_id)
    
    table = client.get_table(table_id)  # Make an API request.
    print(
        "Loaded {} rows and {} columns to {}".format(
            table.num_rows, len(table.schema), table_id
        )
    )

In [None]:
for table_id,file_path in TableId_FilePath_dict_trans.items():
    print(f"Load data: {table_id}:")
    load_data(f"{project_id}.{dataset_id}.{table_id}",file_path)  

**SQL**

DELETE FROM `is3107-news.raw_data.News`

WHERE id IN (

    SELECT id

    FROM `is3107-news.raw_data.News`

    LIMIT 10 -- 删除前10行

);

In [170]:
tables = client.list_tables(dataset_ref)
for table in tables:
    print("Table ID: {}".format(table.table_id))

Table ID: Company
Table ID: FinanceSituation
Table ID: Industry
Table ID: News
Table ID: StockData
Table ID: Tickers
Table ID: raw_news
Table ID: raw_news_test


In [171]:
#check all the tables

# API 
tables = client.list_tables(dataset_ref)

print("Tables contained in '{}':".format(dataset_id))
for table in tables:
    print("Table ID: {}".format(table.table_id))

    table_ref = dataset_ref.table(table.table_id)
    table = client.get_table(table_ref)  

    # show the column and the dtype
    print("Schema of '{}':".format(table.table_id))
    for schema_field in table.schema:
        print("Column name: {}, Column type: {}".format(schema_field.name, schema_field.field_type))

    # view few rows from the table
    preview = client.list_rows(table, max_results=5).to_dataframe()
    print("Preview of the first few rows from '{}':".format(table.table_id))
    print(preview)
    print("\n")  

Tables contained in 'raw_data':
Table ID: Company
Schema of 'Company':
Column name: id, Column type: STRING
Column name: name, Column type: STRING
Column name: fullTimeEmployees, Column type: FLOAT
Column name: Industry, Column type: STRING
Column name: Country, Column type: STRING
Preview of the first few rows from 'Company':
     id                             name  fullTimeEmployees  \
0  LULU         lululemon athletica inc.            38000.0   
1   MDT                   Medtronic plc.            95000.0   
2   STE             STERIS plc (Ireland)            17000.0   
3   ACN                    Accenture plc           742000.0   
4   STX  Seagate Technology Holdings PLC            33400.0   

            Industry  Country  
0  Consumer Cyclical   Canada  
1         Healthcare  Ireland  
2         Healthcare  Ireland  
3         Technology  Ireland  
4         Technology  Ireland  


Table ID: FinanceSituation
Schema of 'FinanceSituation':
Column name: company_id, Column type: STR

In [130]:
#delete table

# for table_id,file_path in TableId_FilePath_dict_trans.items():
#     full_table_id = f"{project_id}.{dataset_id}.{table_id}"
#     client.delete_table(full_table_id)