In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=8a2b33e219f4dc41f3ea1ea7114c58bbdf768c0367a07374736055ce60869b9f
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [2]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2023-12-26 12:34:15--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2023-12-26 12:34:15 (6.65 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [3]:
!unzip archive_etl.zip

Archive:  archive_etl.zip
  inflating: electricity_access_percent.csv  
  inflating: gdp_data.csv            
  inflating: mystery.csv             
  inflating: population_data.csv     
  inflating: population_data.db      
  inflating: population_data.json    
  inflating: population_data.xml     
  inflating: projects_data.csv       
  inflating: rural_population_percent.csv  


# Extract

In [5]:
import os
import csv
from pyspark.sql import SparkSession
import xml.etree.ElementTree as ET

class DataCleaner:
    def __init__(self, debug= False):
        self.debug = debug
        if self.debug: print("\t ---- DEBUG MODE ON ----")
        self.cleaning_folder = "cleaned_files"
        os.makedirs(self.cleaning_folder, exist_ok=True)

    def remove_lines_from_csv(self, input_path, output_path, lines_to_remove=4):
        with open(input_path, "r") as input_file, open(output_path, "w") as output_file:
            for _ in range(lines_to_remove):
                next(input_file)
            output_file.write(input_file.read())
        if self.debug: print(f"Removed first 4 lines from {input_path} and saved to {output_path}")

    def detect_encoding(self, file_path):
        import chardet
        with open(file_path, 'rb') as f:
            result = chardet.detect(f.read())
        return result['encoding']

    def resolve_encoding(self, input_path, output_path, encoding_from, encoding_to):
        with open(input_path, "r", encoding=encoding_from) as input_file, open(output_path, "w", encoding=encoding_to) as output_file:
            output_file.write("index")
            output_file.write(input_file.read())
        if self.debug: print(f"Encoding of {input_path} changed from {encoding_from} to {encoding_to}")

    def xml_to_csv(self, xml_file_path, csv_file_path):
        tree = ET.parse(xml_file_path)
        root = tree.getroot()

        with open(csv_file_path, 'w', newline='') as csv_file:
            csv_writer = csv.writer(csv_file)
            header = [field.attrib['name'] for field in root.find('.//record').iter('field')]
            header[0] = "Country Name"   ######## .replaced "Country or Area" to "Country Name" in header
            csv_writer.writerow(header)

            for record in root.findall('.//record'):
                row = [field.text for field in record.iter('field')]
                csv_writer.writerow(row)
        if self.debug: print(f"converted {xml_file_path} to {csv_file_path}")

    def convert_db_to_csv(self, filename, tablename):
        import sqlite3
        import csv
        conn = sqlite3.connect(filename)
        cursor = conn.cursor()
        cursor.execute("SELECT * FROM "+tablename)
        data = cursor.fetchall()

        csv_file_path = 'cleaned_files/cleaned_'+tablename+"_db.csv"
        with open(csv_file_path, 'w', newline='') as csvfile:
            csvwriter = csv.writer(csvfile)
            csvwriter.writerow([desc[0].replace("_"," ") for desc in cursor.description])
            csvwriter.writerows(data)

        conn.close()
        if self.debug: print(f"converted {tablename} of {filename} to {csv_file_path}")

    def spark_read_csv(self, file_path): return spark.read.csv(file_path, header=True, inferSchema=True)
    def spark_read_json(self, file_path): return spark.read.json(file_path)

# Main
if __name__ == "__main__":

    debug = True                    ##### YOU CAN SWITCH DEBUGGER HERE #####

    cleaner = DataCleaner(debug = debug)

    # CSV (4 line removal)
    csv_files = ["electricity_access_percent.csv", "gdp_data.csv", "population_data.csv", 'rural_population_percent.csv']
    for csv_file in csv_files:
        input_path = csv_file
        output_path = os.path.join(cleaner.cleaning_folder, f"cleaned_{csv_file}")
        cleaner.remove_lines_from_csv(input_path, output_path)

    # CSV (Encoding issue)
    mystery_encoding = cleaner.detect_encoding("mystery.csv")
    cleaner.resolve_encoding("mystery.csv", os.path.join(cleaner.cleaning_folder, "cleaned_mystery.csv"), mystery_encoding, "utf-8")

    # Starting Spark
    spark = SparkSession.builder.appName("ETL").getOrCreate()

    # Read CSV files
    electricity_access_percent_df = cleaner.spark_read_csv('cleaned_files/cleaned_electricity_access_percent.csv')
    gdp_data_df = cleaner.spark_read_csv('cleaned_files/cleaned_gdp_data.csv')
    rural_population_percent_df = cleaner.spark_read_csv('cleaned_files/cleaned_rural_population_percent.csv')

    if debug: cleaner.detect_encoding("mystery.csv")
    mystery_df = cleaner.spark_read_csv('cleaned_files/cleaned_mystery.csv')
    if debug: cleaner.detect_encoding('cleaned_files/cleaned_mystery.csv')

    population_data_csv_df = cleaner.spark_read_csv('cleaned_files/cleaned_population_data.csv')

    # Convert DB to CSV
    cleaner.convert_db_to_csv(filename = 'population_data.db', tablename = "population_data")
    population_data_db_df = cleaner.spark_read_csv('cleaned_files/cleaned_population_data_db.csv')

    # Read JSON file
    population_data_json_df = cleaner.spark_read_json('population_data.json')

    # Read XML file
    cleaner.xml_to_csv("population_data.xml", "cleaned_files/cleaned_population_data_xml.csv")
    population_data_xml_df = cleaner.spark_read_csv('cleaned_files/cleaned_population_data_xml.csv')

    # Read other CSV files
    projects_data_xml_df = cleaner.spark_read_csv('projects_data.csv')

	 ---- DEBUG MODE ON ----
Removed first 4 lines from electricity_access_percent.csv and saved to cleaned_files/cleaned_electricity_access_percent.csv
Removed first 4 lines from gdp_data.csv and saved to cleaned_files/cleaned_gdp_data.csv
Removed first 4 lines from population_data.csv and saved to cleaned_files/cleaned_population_data.csv
Removed first 4 lines from rural_population_percent.csv and saved to cleaned_files/cleaned_rural_population_percent.csv
Encoding of mystery.csv changed from UTF-16 to utf-8
converted population_data of population_data.db to cleaned_files/cleaned_population_data_db.csv
converted population_data.xml to cleaned_files/cleaned_population_data_xml.csv


In [7]:
# electricity_access_percent_df.show()
# gdp_data_df.show()
# mystery_df.show()
# population_data_csv_df.show()
# population_data_db_df.show()
# population_data_json_df.show()
# population_data_xml_df.show()
# projects_data_xml_df.show()
# rural_population_percent_df.show()

# Transform

Replacements done in above extract code:

- fixed headers of db dataframe. `desc[0].replace("_"," ")`
- added index as header in mystry df `output_file.write("index")`
- replaced header `population_data_xml_df` in `header[0] = "Country Name"`
- Lot of cleaning required in `projects` (SKIPPING THIS CLEANING AND DIRECTLY PROCEED TO LOADING)
- Tried getting jar file for jdbc connection (downloaded it and added to spark session)

In [8]:
def merge_dfs (*dfs):
  combined_df = dfs[1]
  for df in dfs[1:]:
    combined_df = combined_df.unionByName(df, allowMissingColumns = True).distinct()
  return combined_df

In [9]:
population_combined_df = merge_dfs(population_data_csv_df, population_data_db_df, population_data_json_df, population_data_xml_df)
population_combined_df.show()

+-----+--------------------+------------+-----------------+--------------+------------+------------+------------+------------+------------+------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+------------+-------------+-------------+-------------+-------------+-------------+-------------+------------+-------------+-------------+-------------+------------+-------------+------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+------------+-------------+-------------+-------------+-------------+----+----+-----+
|index|        Country Name|Country Code|   Indicator Name|Indicator Code|        1960|        1961|        1

In [11]:
def showall():
  print("\telectricity_access_percent_df.show()")
  electricity_access_percent_df.show()

  print("\tgdp_data_df.show()")
  gdp_data_df.show()

  print("\tmystery_df.show()")
  mystery_df.show()

  print("\tpopulation_data_csv_df.show()")
  population_data_csv_df.show()

  print("\tpopulation_data_db_df.show()")
  population_data_db_df.show()

  print("\tpopulation_data_json_df.show()")
  population_data_json_df.show()

  print("\tpopulation_data_xml_df.show()")
  population_data_xml_df.show()

  print("\tprojects_data_xml_df.show()")
  projects_data_xml_df.show()

  print("\trural_population_percent_df.show()")
  rural_population_percent_df.show()

  print("\tpopulation_combined_df.show()")
  population_combined_df.show()

In [12]:
showall()

	electricity_access_percent_df.show()
+--------------------+------------+--------------------+--------------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+------------------+-----------------+-----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----+----+
|        Country Name|Country Code|      Indicator Name|Indicator Code|1960|1961|1962|1963|1964|1965|1966|1967|1968|1969|1970|1971|1972|1973|1974|1975|1976|1977|1978|1979|1980|1981|1982|1983|1984|1985|1986|1987|1988|1989|               1990|  

# Load (Data Frames to Tables in DB)

In [13]:
import sqlite3
from pyspark.sql.types import StringType, IntegerType, DoubleType

projects_data_df = projects_data_xml_df

# Function to create a table based on the schema of a DataFrame
def create_table(cursor, table_name, df):
    type_mapping = {
        StringType(): 'TEXT',
        IntegerType(): 'INTEGER',
        DoubleType(): 'REAL'
        # Add more mappings as needed
    }

    columns_and_types = [(col.name, type_mapping.get(col.dataType, 'TEXT')) for col in df.schema]
    columns_definition = ', '.join([f'"{col}" {data_type}' for col, data_type in columns_and_types])
    create_table_query = f'CREATE TABLE IF NOT EXISTS {table_name} ({columns_definition})'

    if debug: print(f'Generated query: {create_table_query}')  # Print the query for debugging purposes

    cursor.execute(create_table_query)


# Function to insert data into the table
def insert_data(cursor, table_name, df):
    insert_data_query = f'INSERT INTO {table_name} VALUES ({", ".join(["?"] * len(df.columns))})'
    if debug: print(f'Generated query: {insert_data_query}')  # Print the query for debugging purposes
    for row in df.collect():
        cursor.execute(insert_data_query, tuple(row))

# Connect to SQLite and create tables
connection = sqlite3.connect('combined_data_db.sqlite')
cursor = connection.cursor()

create_table(cursor, 'population_data', population_combined_df)
create_table(cursor, 'gdp_data', gdp_data_df)
create_table(cursor, 'electricity_access_percent', electricity_access_percent_df)
create_table(cursor, 'rural_population_percent', rural_population_percent_df)
create_table(cursor, 'mystery', mystery_df)
create_table(cursor, 'projects_data', projects_data_df)

# Insert data into tables
insert_data(cursor, 'population_data', population_combined_df)
insert_data(cursor, 'gdp_data', gdp_data_df)
insert_data(cursor, 'electricity_access_percent', electricity_access_percent_df)
insert_data(cursor, 'rural_population_percent', rural_population_percent_df)
insert_data(cursor, 'mystery', mystery_df)
insert_data(cursor, 'projects_data', projects_data_df)

# Commit the changes and close the connection
connection.commit()
connection.close()

Generated query: CREATE TABLE IF NOT EXISTS population_data ("index" INTEGER, "Country Name" TEXT, "Country Code" TEXT, "Indicator Name" TEXT, "Indicator Code" TEXT, "1960" REAL, "1961" REAL, "1962" REAL, "1963" REAL, "1964" REAL, "1965" REAL, "1966" REAL, "1967" REAL, "1968" REAL, "1969" REAL, "1970" REAL, "1971" REAL, "1972" REAL, "1973" REAL, "1974" REAL, "1975" REAL, "1976" REAL, "1977" REAL, "1978" REAL, "1979" REAL, "1980" REAL, "1981" REAL, "1982" REAL, "1983" REAL, "1984" REAL, "1985" REAL, "1986" REAL, "1987" REAL, "1988" REAL, "1989" REAL, "1990" REAL, "1991" REAL, "1992" REAL, "1993" REAL, "1994" REAL, "1995" REAL, "1996" REAL, "1997" REAL, "1998" REAL, "1999" REAL, "2000" REAL, "2001" REAL, "2002" REAL, "2003" REAL, "2004" REAL, "2005" REAL, "2006" REAL, "2007" REAL, "2008" REAL, "2009" REAL, "2010" REAL, "2011" REAL, "2012" REAL, "2013" REAL, "2014" REAL, "2015" REAL, "2016" REAL, "2017" REAL, "Item" TEXT, "Year" INTEGER, "Value" TEXT)
Generated query: CREATE TABLE IF NOT 

# END

In [14]:
# spark.stop()

AttributeError: ignored

# TEST COMPLETE CODE

In [16]:
import os
import csv
from pyspark.sql import SparkSession
import xml.etree.ElementTree as ET

class DataCleaner:
    def __init__(self, debug= False):
        self.debug = debug
        if self.debug: print("\t ---- DEBUG MODE ON ----")
        self.cleaning_folder = "cleaned_files"
        os.makedirs(self.cleaning_folder, exist_ok=True)

    def remove_lines_from_csv(self, input_path, output_path, lines_to_remove=4):
        with open(input_path, "r") as input_file, open(output_path, "w") as output_file:
            for _ in range(lines_to_remove):
                next(input_file)
            output_file.write(input_file.read())
        if self.debug: print(f"Removed first 4 lines from {input_path} and saved to {output_path}")

    def detect_encoding(self, file_path):
        import chardet
        with open(file_path, 'rb') as f:
            result = chardet.detect(f.read())
        return result['encoding']

    def resolve_encoding(self, input_path, output_path, encoding_from, encoding_to):
        with open(input_path, "r", encoding=encoding_from) as input_file, open(output_path, "w", encoding=encoding_to) as output_file:
            output_file.write("index")
            output_file.write(input_file.read())
        if self.debug: print(f"Encoding of {input_path} changed from {encoding_from} to {encoding_to}")

    def xml_to_csv(self, xml_file_path, csv_file_path):
        tree = ET.parse(xml_file_path)
        root = tree.getroot()

        with open(csv_file_path, 'w', newline='') as csv_file:
            csv_writer = csv.writer(csv_file)
            header = [field.attrib['name'] for field in root.find('.//record').iter('field')]
            header[0] = "Country Name"   ######## .replaced "Country or Area" to "Country Name" in header
            csv_writer.writerow(header)

            for record in root.findall('.//record'):
                row = [field.text for field in record.iter('field')]
                csv_writer.writerow(row)
        if self.debug: print(f"converted {xml_file_path} to {csv_file_path}")

    def convert_db_to_csv(self, filename, tablename):
        import sqlite3
        import csv
        conn = sqlite3.connect(filename)
        cursor = conn.cursor()
        cursor.execute("SELECT * FROM "+tablename)
        data = cursor.fetchall()

        csv_file_path = 'cleaned_files/cleaned_'+tablename+"_db.csv"
        with open(csv_file_path, 'w', newline='') as csvfile:
            csvwriter = csv.writer(csvfile)
            csvwriter.writerow([desc[0].replace("_"," ") for desc in cursor.description])
            csvwriter.writerows(data)

        conn.close()
        if self.debug: print(f"converted {tablename} of {filename} to {csv_file_path}")

    def spark_read_csv(self, file_path): return spark.read.csv(file_path, header=True, inferSchema=True)
    def spark_read_json(self, file_path): return spark.read.json(file_path)

    def merge_dfs (self, *dfs):
        combined_df = dfs[1]
        for df in dfs[1:]:
            combined_df = combined_df.unionByName(df, allowMissingColumns = True).distinct()
        if debug: print("combined ", dfs, "dataframes")
        return combined_df

    def showall(self):
        try:
            print("\telectricity_access_percent_df.show()")
            electricity_access_percent_df.show()

            print("\tgdp_data_df.show()")
            gdp_data_df.show()

            print("\tmystery_df.show()")
            mystery_df.show()

            print("\tpopulation_data_csv_df.show()")
            population_data_csv_df.show()

            print("\tpopulation_data_db_df.show()")
            population_data_db_df.show()

            print("\tpopulation_data_json_df.show()")
            population_data_json_df.show()

            print("\tpopulation_data_xml_df.show()")
            population_data_xml_df.show()

            print("\tprojects_data_xml_df.show()")
            projects_data_xml_df.show()

            print("\trural_population_percent_df.show()")
            rural_population_percent_df.show()

            print("\tpopulation_combined_df.show()")
            population_combined_df.show()
        except: pass

    def load(self, output_file):
        import sqlite3
        from pyspark.sql.types import StringType, IntegerType, DoubleType

        projects_data_df = projects_data_xml_df

        # Function to create a table based on the schema of a DataFrame
        def create_table(cursor, table_name, df):
            type_mapping = {
                StringType(): 'TEXT',
                IntegerType(): 'INTEGER',
                DoubleType(): 'REAL'
            }

            columns_and_types = [(col.name, type_mapping.get(col.dataType, 'TEXT')) for col in df.schema]
            columns_definition = ', '.join([f'"{col}" {data_type}' for col, data_type in columns_and_types])
            create_table_query = f'CREATE TABLE IF NOT EXISTS {table_name} ({columns_definition})'

            if debug: print(f'Generated query: {create_table_query}')  # Print the query for debugging purposes

            cursor.execute(create_table_query)


        # Function to insert data into the table
        def insert_data(cursor, table_name, df):
            insert_data_query = f'INSERT INTO {table_name} VALUES ({", ".join(["?"] * len(df.columns))})'
            if debug: print(f'Generated query: {insert_data_query}')  # Print the query for debugging purposes
            for row in df.collect():
                cursor.execute(insert_data_query, tuple(row))

        # Connect to SQLite and create tables
        connection = sqlite3.connect(output_file)
        cursor = connection.cursor()

        create_table(cursor, 'population_data', population_combined_df)
        create_table(cursor, 'gdp_data', gdp_data_df)
        create_table(cursor, 'electricity_access_percent', electricity_access_percent_df)
        create_table(cursor, 'rural_population_percent', rural_population_percent_df)
        create_table(cursor, 'mystery', mystery_df)
        create_table(cursor, 'projects_data', projects_data_df)

        # Insert data into tables
        insert_data(cursor, 'population_data', population_combined_df)
        insert_data(cursor, 'gdp_data', gdp_data_df)
        insert_data(cursor, 'electricity_access_percent', electricity_access_percent_df)
        insert_data(cursor, 'rural_population_percent', rural_population_percent_df)
        insert_data(cursor, 'mystery', mystery_df)
        insert_data(cursor, 'projects_data', projects_data_df)

        # Commit the changes and close the connection
        connection.commit()
        connection.close()

# Main
if __name__ == "__main__":

    debug = True                    ##### YOU CAN SWITCH DEBUGGER HERE #####

    cleaner = DataCleaner(debug = debug)

    if debug: print("\t---- EXTRACTION INITIATED ----\t")
    # CSV (4 line removal)
    csv_files = ["electricity_access_percent.csv", "gdp_data.csv", "population_data.csv", 'rural_population_percent.csv']
    for csv_file in csv_files:
        input_path = csv_file
        output_path = os.path.join(cleaner.cleaning_folder, f"cleaned_{csv_file}")
        cleaner.remove_lines_from_csv(input_path, output_path)

    # CSV (Encoding issue)
    mystery_encoding = cleaner.detect_encoding("mystery.csv")
    cleaner.resolve_encoding("mystery.csv", os.path.join(cleaner.cleaning_folder, "cleaned_mystery.csv"), mystery_encoding, "utf-8")

    # Starting Spark
    spark = SparkSession.builder.appName("ETL").getOrCreate()

    # Read CSV files
    electricity_access_percent_df = cleaner.spark_read_csv('cleaned_files/cleaned_electricity_access_percent.csv')
    gdp_data_df = cleaner.spark_read_csv('cleaned_files/cleaned_gdp_data.csv')
    rural_population_percent_df = cleaner.spark_read_csv('cleaned_files/cleaned_rural_population_percent.csv')

    if debug: cleaner.detect_encoding("mystery.csv")
    mystery_df = cleaner.spark_read_csv('cleaned_files/cleaned_mystery.csv')
    if debug: cleaner.detect_encoding('cleaned_files/cleaned_mystery.csv')

    population_data_csv_df = cleaner.spark_read_csv('cleaned_files/cleaned_population_data.csv')

    # Convert DB to CSV
    cleaner.convert_db_to_csv(filename = 'population_data.db', tablename = "population_data")
    population_data_db_df = cleaner.spark_read_csv('cleaned_files/cleaned_population_data_db.csv')

    # Read JSON file
    population_data_json_df = cleaner.spark_read_json('population_data.json')

    # Read XML file
    cleaner.xml_to_csv("population_data.xml", "cleaned_files/cleaned_population_data_xml.csv")
    population_data_xml_df = cleaner.spark_read_csv('cleaned_files/cleaned_population_data_xml.csv')

    # Read other CSV files
    projects_data_xml_df = cleaner.spark_read_csv('projects_data.csv')

    if debug: print("\t---- EXTRACT COMPLETED \t PROCEDING WITH TRANSFORMATION ----\t")

    population_combined_df = cleaner.merge_dfs(population_data_csv_df, population_data_db_df, population_data_json_df, population_data_xml_df)

    if debug: print("\t---- TRANSFORMATION COMPLETED \t PROCEDING WITH LOADING ----\t")
    cleaner.load(output_file="combined_data_db.sqlite")

    # END SPARK SESSION
    if debug:
        print("\t---- LOADING FINISHED ----\t")
        cleaner.showall()

    spark.stop()

	 ---- DEBUG MODE ON ----
	---- EXTRACTION INITIATED ----	
Removed first 4 lines from electricity_access_percent.csv and saved to cleaned_files/cleaned_electricity_access_percent.csv
Removed first 4 lines from gdp_data.csv and saved to cleaned_files/cleaned_gdp_data.csv
Removed first 4 lines from population_data.csv and saved to cleaned_files/cleaned_population_data.csv
Removed first 4 lines from rural_population_percent.csv and saved to cleaned_files/cleaned_rural_population_percent.csv
Encoding of mystery.csv changed from UTF-16 to utf-8
converted population_data of population_data.db to cleaned_files/cleaned_population_data_db.csv
converted population_data.xml to cleaned_files/cleaned_population_data_xml.csv
	---- EXTRACT COMPLETED 	 PROCEDING WITH TRANSFORMATION ----	
combined  (DataFrame[Country Name: string, Country Code: string, Indicator Name: string, Indicator Code: string, 1960: bigint, 1961: bigint, 1962: bigint, 1963: bigint, 1964: bigint, 1965: bigint, 1966: bigint, 1967: 