In [None]:
!pip install pyspark
!pip install openpyxl

import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when
from pyspark.sql import SparkSession, functions as F

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=5c76de2eb3d23b9bd892031f6f39527f7333caeb08d41722c48e0417c7e33658
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [None]:
# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Address Data Merging") \
    .getOrCreate()

# Load DataFrames from CSV files
# TAKE THE CSV FILES WITHOUT THE ADDRESS COLUMNS EXCEPT THE GOOGLE STANDARD ADDRESS TO AVOID THE DELIMITER ISSUES
df_1945 = spark.read.option("header", "true").option("inferSchema", "true").csv('/content/Copy of 1945_addresses_aggregated_repeated - Sheet1.csv')
df_1975 = spark.read.option("header", "true").option("inferSchema", "true").csv('/content/Copy of 1975_addresses_aggregated_repeated - Sheet1.csv')
df_1985 = spark.read.option("header", "true").option("inferSchema", "true").csv('/content/Copy of 1985_addresses_aggregated_repeated - Sheet1.csv')

# Rename columns to 'standard_address' for uniform joining
df_1945 = df_1945.withColumnRenamed("google_standard_address_1945", "standard_address")
df_1975 = df_1975.withColumnRenamed("google_standard_address_1975", "standard_address")
df_1985 = df_1985.withColumnRenamed("google_standard_address_1985", "standard_address")

# Combine addresses into one DataFrame and remove duplicates
all_addresses = df_1945.select("standard_address").union(df_1975.select("standard_address")).union(df_1985.select("standard_address")).distinct()

# Perform left joins to merge all data based on 'standard_address'
df_merged_1945 = all_addresses.join(df_1945, "standard_address", "left")
df_merged_1975 = all_addresses.join(df_1975, "standard_address", "left")
df_merged_1985 = all_addresses.join(df_1985, "standard_address", "left")

# Combine all dataframes into a final merged dataframe
df_combined = df_merged_1945.join(df_merged_1975, on="standard_address", how="outer")
df_final = df_combined.join(df_merged_1985, on="standard_address", how="outer")


AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/content/Copy of 1945_addresses_aggregated_repeated - Sheet1.csv.

In [None]:
# Convert the Spark DataFrame to a Pandas DataFrame
pandas_df = df_final.toPandas()
pandas_df.to_csv('final.csv')

In [None]:
# Adding indicator columns for presence of data in each year
df_analysis = df_final.withColumn("present_1945", F.col("ID_1945").isNotNull()) \
                       .withColumn("present_1975", F.col("ID_1975").isNotNull()) \
                       .withColumn("present_1985", F.col("ID_1985").isNotNull())

# Group by the presence indicators and count
address_presence_summary = df_analysis.groupBy("present_1945", "present_1975", "present_1985").count()

# Show the results
address_presence_summary.show()

# Calculate specific categories based on the summary DataFrame
all_years = address_presence_summary.filter("present_1945 AND present_1975 AND present_1985").collect()[0]["count"]
only_1945 = address_presence_summary.filter("present_1945 AND NOT present_1975 AND NOT present_1985").collect()[0]["count"]
only_1975 = address_presence_summary.filter("NOT present_1945 AND present_1975 AND NOT present_1985").collect()[0]["count"]
only_1985 = address_presence_summary.filter("NOT present_1945 AND NOT present_1975 AND present_1985").collect()[0]["count"]
both_1945_1975 = address_presence_summary.filter("present_1945 AND present_1975 AND NOT present_1985").collect()[0]["count"]
both_1975_1985 = address_presence_summary.filter("NOT present_1945 AND present_1975 AND present_1985").collect()[0]["count"]
both_1985_1945 = address_presence_summary.filter("present_1945 AND NOT present_1975 AND present_1985").collect()[0]["count"]

# Print the counts
print("Addresses found in all years:", all_years)
print("Addresses found only in 1945:", only_1945)
print("Addresses found only in 1975:", only_1975)
print("Addresses found only in 1985:", only_1985)
print("Addresses found in both 1945 and 1975:", both_1945_1975)
print("Addresses found in both 1975 and 1985:", both_1975_1985)
print("Addresses found in both 1985 and 1945:", both_1985_1945)


NameError: name 'df_final' is not defined

In [None]:
df = pd.read_csv('/content/common addresses new - final.csv')

In [None]:
df.columns

Index(['standard_address', 'latitude', 'longitude', 'one_rep_ID1945',
       'ID_1945', 'people_1945', 'document_number_1945', 'parcel_number_1945',
       'original_grantee_city_or_town_1945', 'aceres_rendered_1945',
       'value_dollars_1945', 'designate_homestead_1945',
       'value_of_city_property_1945', 'value_of_personal_property_1945',
       'total_value_for_state_tax_1945', 'state_tax_1945', 'county_tax_1945',
       'district_school_1945', 'total_tax_1945', '2022_assessed_value_1945',
       'one_rep_ID1975', 'ID_1975', 'people_1975', 'document_number_1975',
       'original_grantee_city_or_town_1975', 'aceres_rendered_1975',
       'value_dollars_(state_value)_1975', 'designate_homestead_1975',
       'value_of_city_property_(total_county_value)_1975',
       'value_of_personal_property_1975', 'total_value_for_county_tax_1975',
       'state_tax_1975', 'county_tax_1975', 'district_school_1975',
       'tax_total_(including_hospital_and_water_taxes)_1975', 'one_rep_ID1985'

In [None]:
df['commonA'] = 0

In [None]:
# def fill_column(row):
#     # Check if all three are not null first
#     if pd.notna(row['one_rep_ID1945']) and pd.notna(row['one_rep_ID1975']) and pd.notna(row['one_rep_ID1985']):
#         return 4
#     elif pd.notna(row['one_rep_ID1945']) and pd.notna(row['one_rep_ID1975']):
#         return 1
#     elif pd.notna(row['one_rep_ID1975']) and pd.notna(row['one_rep_ID1985']):
#         return 3
#     elif pd.notna(row['one_rep_ID1985']) and pd.notna(row['one_rep_ID1945']):
#         return 2

# # Assuming df is your Pandas DataFrame
# df['commonA'] = df.apply(fill_column, axis=1)


conditions = [
    (df['ID_1945'].notna() & df['ID_1985'].notna() & df['ID_1975'].notna()),  # All three are not NA
    (df['ID_1945'].notna() & df['ID_1985'].notna()),                          # 1945 and 1985 are not NA
    (df['ID_1945'].notna() & df['ID_1975'].notna()),                          # 1945 and 1975 are not NA
    (df['ID_1975'].notna() & df['ID_1985'].notna())                           # 1975 and 1985 are not NA
]

choices = [4, 3, 2, 1]

# Use np.select to apply conditions and choices
df['commonA'] = np.select(conditions, choices, default=0)

In [None]:
print("All three IDs not null:", (df['ID_1945'].notna() & df['ID_1985'].notna() & df['ID_1975'].notna()).sum())
print("ID_1945 and ID_1985 not null:", (df['ID_1945'].notna() & df['ID_1985'].notna()).sum())
print("ID_1945 and ID_1975 not null:", (df['ID_1945'].notna() & df['ID_1975'].notna()).sum())
print("ID_1975 and ID_1985 not null:", (df['ID_1975'].notna() & df['ID_1985'].notna()).sum())


All three IDs not null: 314
ID_1945 and ID_1985 not null: 508
ID_1945 and ID_1975 not null: 889
ID_1975 and ID_1985 not null: 7189


In [None]:
df.loc[(df['ID_1945'].notna() & df['ID_1985'].notna() & df['ID_1975'].notna()), 'commonA'] = 4
df.loc[(df['ID_1945'].notna() & df['ID_1985'].notna()), 'commonA'] = 3
df.loc[(df['ID_1945'].notna() & df['ID_1975'].notna()), 'commonA'] = 2
df.loc[(df['ID_1975'].notna() & df['ID_1985'].notna()), 'commonA'] = 1


In [None]:
df['commonA'] = 0
df.loc[(df['ID_1975'].notna() & df['ID_1985'].notna()) & df['ID_1945'].isna(), 'commonA'] = 1
df.loc[(df['ID_1945'].notna() & df['ID_1975'].notna()) & df['ID_1985'].isna(), 'commonA'] = 2
df.loc[(df['ID_1945'].notna() & df['ID_1985'].notna()) & df['ID_1975'].isna(), 'commonA'] = 3
df.loc[(df['ID_1945'].notna() & df['ID_1975'].notna() & df['ID_1985'].notna()), 'commonA'] = 4

In [None]:
df[df['commonA'] == 4].to_csv('common_in_all.csv', index=False)