## Setup

In [1]:
import pandas as pd
import numpy as np
from datetime import datetime

import findspark
findspark.init()

from pyspark.sql import *
import pyspark.sql.functions as F
from pyspark import SparkContext

## Data Loading

'locations.csv' is a file containing the English names of the prefectures. The IDs for the locations are the same IDs provided in the csv files from the Japanese Metrological Agency. 

In [37]:
prefecture_eng = pd.read_csv('locations.csv')
prefecture_eng.head()

Unnamed: 0,ID,Location
0,401,Wakkanai
1,406,Rumoi
2,407,Asahikawa
3,409,Abashiri
4,412,Sapporo


## Functions for cleaning the .csv files

In [16]:
"""
convert_to_datetime

Converts the int dates to strings.

entry: Integer of the month-date, written in format mmdd.
year: Integer of the year, extracted by the column name. 
"""
def convert_to_datetime(entry, year):
    date_str = str(int(entry)).zfill(4)
    month = str(date_str[:2])
    day = str(date_str[2:])
    return str(year) + '-' + month + '-' + day

In [40]:
"""
clean_data

Cleans the csv file in the format to a data analysis ready dataframe. 

Currently only creates a dataframe for the observed dates. 
No sanity check for checking if locations are matched correctly. The function will not 
No table for the remarks. 

file_path: String for the file path. 
"""

def clean_data(file_path):
    df_jp = pd.read_csv(file_path, encoding='cp932', skiprows = [0]) # Encoding to correctly read Japanese characters
    
    # Sanity check for if the IDs of the prefectures match. 
    # If not, the function will print a warning and stop running. 
    if not df_jp['番号'].equals(prefecture_eng['ID']):
        print('English Prefecture and Japanese Prefectures do not match. Please check the files.')
        return None
    
    # Create a column for the prefectures in English
    df_jp['Site Name'] = prefecture_eng['Location']

    # Drop the prefectures in Japanese and the ID column.
    df_eng = df_jp.drop(['地点名', '番号'], axis=1)
    
    # Remove unncessary columns.
    cols = df_eng.columns.tolist()
    cols = cols[-1:] + cols[:-1]
    df_eng = df_eng[cols]
    
    # Partition the data to two tables, one for the observation dates and the other for remarks.
    df_date = df_eng[df_eng.columns[1:][::2][:-4].insert(0, 'Site Name')]
    df_rmk = df_eng[df_eng.columns[::2][:-2]]
    
    # Convert the times to strings of format yyyy-mm-dd.
    years = df_date.columns[1:]
    for year in years:
        df_date[year] = df_date[year].apply(lambda x: convert_to_datetime(x, year) if x != 0 else 0)
        
    # Check if the observations are still made or not. 
    prev_yr = datetime.today().year -1

    # Get the column for the previous year and if the value is 0, then it is no longer being observed. 
    prev_yr_dates = df_date[str(prev_yr)]
    df_date['Currently Being Observed'] = prev_yr_dates.apply(lambda x: True if x != 0 else False)
    
    # Reorder the columns are return the dataframe. 
    reorder = ['Site Name', 'Currently Being Observed'] + list(years)
    return df_date[reorder]

# Examples

Three examples are provided in the respositry under the directory sample-data: 1) sakura-data, 2) ajisai-data, and 3) uguisu-data. The following code is an example for how to run the function and write the cleaned data frame to a csv file.   

Note that function will return warnings but the code still correctly parses the data. 

In [49]:
# Clean the data. 
cleaned = clean_data("./sample_data/sakura-data/sakura_full_bloom.csv")

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_date[year] = df_date[year].apply(lambda x: convert_to_datetime(x, year) if x != 0 else 0)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_date['Currently Being Observed'] = prev_yr_dates.apply(lambda x: True if x != 0 else False)


In [50]:
cleaned

Unnamed: 0,Site Name,Currently Being Observed,1953,1954,1955,1956,1957,1958,1959,1960,...,2023,2024,2025,2026,2027,2028,2029,2030,2031,2032
0,Wakkanai,True,1953-05-21,1954-05-17,1955-05-17,1956-05-12,1957-05-18,1958-05-23,1959-05-03,1960-05-19,...,2023-05-03,0,0,0,0,0,0,0,0,0
1,Rumoi,False,1953-05-08,1954-05-11,1955-05-13,1956-05-08,1957-05-10,1958-05-12,0,0,...,0,0,0,0,0,0,0,0,0,0
2,Asahikawa,True,1953-05-11,1954-05-13,1955-05-09,1956-05-07,1957-05-09,1958-05-10,1959-05-08,1960-05-13,...,2023-04-25,0,0,0,0,0,0,0,0,0
3,Abashiri,True,1953-05-24,1954-05-16,1955-05-18,1956-05-09,0,0,1959-05-03,0,...,2023-04-28,0,0,0,0,0,0,0,0,0
4,Sapporo,True,1953-05-07,1954-05-04,1955-05-11,1956-05-04,1957-05-09,0,1959-05-05,1960-05-05,...,2023-04-15,0,0,0,0,0,0,0,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
97,Miyako-jima,True,0,0,0,0,0,0,0,0,...,2023-01-15,0,0,0,0,0,0,0,0,0
98,Kume-jima,False,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
99,Naha,True,0,0,0,0,0,0,0,0,...,2023-01-07,0,0,0,0,0,0,0,0,0
100,Nago,False,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [51]:
# Write cleaned dataframe to a csv file. 
cleaned.to_csv("./sample_data/sakura-data/sakura_cleaned.csv", index=False)