In [1]:
import findspark
import os
findspark.init()

os.environ['JAVA_HOME'] = 'C:\\Program Files\\OpenLogic\\jdk-11.0.25.9-hotspot\\'

spark_url = 'local[*]'
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import col, explode

spark = SparkSession.builder\
        .master(spark_url)\
        .appName('Spark Tutorial')\
        .config('spark.ui.port', '4040')\
        .config("spark.driver.memory", "8g") \
        .config("spark.executor.memory", "8g") \
        .config("spark.executor.cores", "4") \
        .config("spark.memory.fraction", "0.8") \
        .config("spark.driver.maxResultSize", "2g") \
        .getOrCreate()
sc = spark.sparkContext

In [2]:
import os

# Read all JSON files from the folder
folder_path = "data"
json_files = []
for root, dirs, files in os.walk(folder_path):
    for file in files:
        if file.endswith('.json'):
            json_files.append(os.path.join(root, file))

# Read each JSON file into a DataFrame and union them
paperDF = spark.read.json(json_files, multiLine=True)

In [3]:
paperDF.printSchema()

root
 |-- abstracts-retrieval-response: struct (nullable = true)
 |    |-- affiliation: string (nullable = true)
 |    |-- authkeywords: struct (nullable = true)
 |    |    |-- author-keyword: string (nullable = true)
 |    |-- authors: struct (nullable = true)
 |    |    |-- author: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- @_fa: string (nullable = true)
 |    |    |    |    |-- @auid: string (nullable = true)
 |    |    |    |    |-- @seq: string (nullable = true)
 |    |    |    |    |-- affiliation: string (nullable = true)
 |    |    |    |    |-- author-url: string (nullable = true)
 |    |    |    |    |-- ce:alias: string (nullable = true)
 |    |    |    |    |-- ce:alt-name: string (nullable = true)
 |    |    |    |    |-- ce:degrees: string (nullable = true)
 |    |    |    |    |-- ce:given-name: string (nullable = true)
 |    |    |    |    |-- ce:indexed-name: string (nullable = true)
 |    |    |    |    |

In [3]:
# Read the CSV file into a DataFrame
df = spark.read.csv('countries.csv', header=True, inferSchema=True)

# Show the DataFrame
df.show()

+-------+----------+-----------+--------------------+
|country|  latitude|  longitude|                name|
+-------+----------+-----------+--------------------+
|     AD| 42.546245|   1.601554|             Andorra|
|     AE| 23.424076|  53.847818|United Arab Emirates|
|     AF|  33.93911|  67.709953|         Afghanistan|
|     AG| 17.060816| -61.796428| Antigua and Barbuda|
|     AI| 18.220554| -63.068615|            Anguilla|
|     AL| 41.153332|  20.168331|             Albania|
|     AM| 40.069099|  45.038189|             Armenia|
|     AN| 12.226079| -69.060087|Netherlands Antilles|
|     AO|-11.202692|  17.873887|              Angola|
|     AQ|-75.250973|  -0.071389|          Antarctica|
|     AR|-38.416097| -63.616672|           Argentina|
|     AS|-14.270972|-170.132217|      American Samoa|
|     AT| 47.516231|  14.550072|             Austria|
|     AU|-25.274398| 133.775136|           Australia|
|     AW|  12.52111| -69.968338|               Aruba|
|     AZ| 40.143105|  47.576

In [4]:
# Read the CSV file into a DataFrame
worldcities_df = spark.read.csv('worldcities.csv', header=True, inferSchema=True)

# Show the DataFrame
worldcities_df.show()

+------------+------------+--------+--------+-------------+----+----+--------------------+-------+-----------+----------+
|        city|  city_ascii|     lat|     lng|      country|iso2|iso3|          admin_name|capital| population|        id|
+------------+------------+--------+--------+-------------+----+----+--------------------+-------+-----------+----------+
|       Tokyo|       Tokyo| 35.6897|139.6922|        Japan|  JP| JPN|               Tōkyō|primary|   3.7732E7|1392685764|
|     Jakarta|     Jakarta|  -6.175|106.8275|    Indonesia|  ID| IDN|             Jakarta|primary|   3.3756E7|1360771077|
|       Delhi|       Delhi|   28.61|   77.23|        India|  IN| IND|               Delhi|  admin|   3.2226E7|1356872604|
|   Guangzhou|   Guangzhou|   23.13|  113.26|        China|  CN| CHN|           Guangdong|  admin|    2.694E7|1156237133|
|      Mumbai|      Mumbai| 19.0761| 72.8775|        India|  IN| IND|         Mahārāshtra|  admin|   2.4973E7|1356226629|
|      Manila|      Mani

In [5]:
from pyspark.sql.functions import col, when, expr

# Select the affiliation column and drop null values
affiliation_df = paperDF.select("abstracts-retrieval-response.affiliation").dropna()

# Create a new column to identify the type of row
affiliation_df = affiliation_df.withColumn("row_type", 
                                           when(col("affiliation").startswith("["), "array")
                                           .when(col("affiliation").startswith("{"), "object")
                                           .otherwise("other"))

# Create DataFrames based on the row type
array_df = affiliation_df.filter(col("row_type") == "array").withColumn("affiliation", expr("explode(from_json(affiliation, 'array<string>'))"))
object_df = affiliation_df.filter(col("row_type") == "object")
other_df = affiliation_df.filter(col("row_type") == "other")

# Show the DataFrames
array_df.show()
object_df.show()
other_df.show()

+--------------------+--------+
|         affiliation|row_type|
+--------------------+--------+
|{"affiliation-cit...|   array|
|{"affiliation-cit...|   array|
|{"affiliation-cit...|   array|
|{"affiliation-cit...|   array|
|{"affiliation-cit...|   array|
|{"affiliation-cit...|   array|
|{"affiliation-cit...|   array|
|{"affiliation-cit...|   array|
|{"affiliation-cit...|   array|
|{"affiliation-cit...|   array|
|{"affiliation-cit...|   array|
|{"affiliation-cit...|   array|
|{"affiliation-cit...|   array|
|{"affiliation-cit...|   array|
|{"affiliation-cit...|   array|
|{"affiliation-cit...|   array|
|{"affiliation-cit...|   array|
|{"affiliation-cit...|   array|
|{"affiliation-cit...|   array|
|{"affiliation-cit...|   array|
+--------------------+--------+
only showing top 20 rows

+--------------------+--------+
|         affiliation|row_type|
+--------------------+--------+
|{"affiliation-cit...|  object|
|{"affiliation-cit...|  object|
|{"affiliation-cit...|  object|
|{"affiliation

In [11]:


# Show 3 examples of array_df
array_df.show(3, truncate=False)

# Show 3 examples of object_df
object_df.show(3, truncate=False)


+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+
|affiliation                                                                                                                                                                                                    |row_type|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+
|{"affiliation-city":"Rome","@id":"60032350","affilname":"Sapienza Università di Roma","@href":"https://api.elsevier.com/content/affiliation/affiliation_id/60032350","affiliation-country":"Italy"}            |array   |
|{"affiliation-city":"Sao Paulo","@id":"60008088","affilname":"Universidade de São Paulo","@href":"https://api.elsevier.com/

In [16]:
from pyspark.sql.functions import from_json, schema_of_json

# Define the schema for the affiliation column
affiliation_schema = schema_of_json("""
{
    "affilname": "string",
    "affiliation-city": "string",
    "affiliation-country": "string"
}
""")

# Use array_df and object_df to expand their JSON strings
expanded_array_df = array_df.withColumn("affiliation", from_json(col("affiliation"), affiliation_schema))
expanded_object_df = object_df.withColumn("affiliation", from_json(col("affiliation"), affiliation_schema))

# Select individual fields for array_df
expanded_array_df = expanded_array_df.select(
    col("affiliation.affilname").alias("affiliation_name"),
    col("affiliation.affiliation-city").alias("affiliation_city"),
    col("affiliation.affiliation-country").alias("affiliation_country")
)

# Select individual fields for object_df
expanded_object_df = expanded_object_df.select(
    col("affiliation.affilname").alias("affiliation_name"),
    col("affiliation.affiliation-city").alias("affiliation_city"),
    col("affiliation.affiliation-country").alias("affiliation_country")
)

# Show the expanded DataFrames
expanded_array_df.show()
expanded_object_df.show()

+--------------------+----------------+-------------------+
|    affiliation_name|affiliation_city|affiliation_country|
+--------------------+----------------+-------------------+
|Sapienza Universi...|            Rome|              Italy|
|Universidade de S...|       Sao Paulo|             Brazil|
|Universidad de Bu...|    Buenos Aires|          Argentina|
|Université Côte d...|            Nice|             France|
|Washington Univer...|       St. Louis|      United States|
|Universidad de Chile|        Santiago|              Chile|
|Karolinska Instit...|       Stockholm|             Sweden|
|Albert Einstein C...|        New York|      United States|
|Université Paris ...|           Paris|             France|
|Universidad de Se...|         Sevilla|              Spain|
|University of Mic...|       Ann Arbor|      United States|
| Universität zu Köln|            Koln|            Germany|
| Kræftens Bekæmpelse|      Copenhagen|            Denmark|
|The University of...|         Houston| 

In [23]:
combined_df = expanded_array_df.union(expanded_object_df)

# Show the combined DataFrame
combined_df.show()

+--------------------+----------------+-------------------+
|    affiliation_name|affiliation_city|affiliation_country|
+--------------------+----------------+-------------------+
|Sapienza Universi...|            Rome|              Italy|
|Universidade de S...|       Sao Paulo|             Brazil|
|Universidad de Bu...|    Buenos Aires|          Argentina|
|Université Côte d...|            Nice|             France|
|Washington Univer...|       St. Louis|      United States|
|Universidad de Chile|        Santiago|              Chile|
|Karolinska Instit...|       Stockholm|             Sweden|
|Albert Einstein C...|        New York|      United States|
|Université Paris ...|           Paris|             France|
|Universidad de Se...|         Sevilla|              Spain|
|University of Mic...|       Ann Arbor|      United States|
| Universität zu Köln|            Koln|            Germany|
| Kræftens Bekæmpelse|      Copenhagen|            Denmark|
|The University of...|         Houston| 

In [None]:
from pyspark.sql.functions import coalesce

# Join combined_df with worldcities_df on city and country
mapped_df = combined_df.join(worldcities_df, 
                             (combined_df.affiliation_city == worldcities_df.city_ascii) & 
                             (combined_df.affiliation_country == worldcities_df.country), 
                             how='left')

# Select the required columns
mapped_df = mapped_df.select('affiliation_name', 'affiliation_city', 'affiliation_country', 'latitude', 'longitude')

# Show the mapped DataFrame
mapped_df.show()

+--------------------+----------------+-------------------+--------+---------+
|    affiliation_name|affiliation_city|affiliation_country|latitude|longitude|
+--------------------+----------------+-------------------+--------+---------+
|Sapienza Universi...|            Rome|              Italy| 41.8933|  12.4828|
|Universidade de S...|       Sao Paulo|             Brazil|  -23.55| -46.6333|
|Universidad de Bu...|    Buenos Aires|          Argentina|-34.6033| -58.3817|
|Université Côte d...|            Nice|             France| 43.7034|   7.2663|
|Washington Univer...|       St. Louis|      United States| 38.6359| -90.2451|
|Universidad de Chile|        Santiago|              Chile|-33.4372| -70.6506|
|Karolinska Instit...|       Stockholm|             Sweden| 59.3294|  18.0686|
|Albert Einstein C...|        New York|      United States| 40.6943| -73.9249|
|Université Paris ...|           Paris|             France| 48.8567|   2.3522|
|Universidad de Se...|         Sevilla|             

In [31]:
# Filter rows where latitude and longitude are null
null_lat_long_df = mapped_df.filter((col('latitude').isNull()) & (col('longitude').isNull()))

# Drop rows where latitude and longitude are null from the original DataFrame
filtered_mapped_df = mapped_df.filter((col('latitude').isNotNull()) & (col('longitude').isNotNull()))

# Show the new DataFrame with null latitude and longitude
null_lat_long_df.show()

# Show the filtered original DataFrame
filtered_mapped_df.show()

+--------------------+--------------------+-------------------+--------+---------+
|    affiliation_name|    affiliation_city|affiliation_country|latitude|longitude|
+--------------------+--------------------+-------------------+--------+---------+
| Universität zu Köln|                Koln|            Germany|    NULL|     NULL|
|           KU Leuven|         3000 Leuven|            Belgium|    NULL|     NULL|
|NUS Yong Loo Lin ...|      Singapore City|          Singapore|    NULL|     NULL|
|Faculty of Medici...|             Clayton|          Australia|    NULL|     NULL|
|      Uniklinik Köln|                Koln|            Germany|    NULL|     NULL|
|Goethe-Universitä...|   Frankfurt am Main|            Germany|    NULL|     NULL|
|Universidad de la...|San Cristobal de ...|              Spain|    NULL|     NULL|
|Università degli ...|               Padua|              Italy|    NULL|     NULL|
|Departement Cellu...|              Leuven|            Belgium|    NULL|     NULL|
|For

In [36]:
# Drop rows with null latitude and longitude
cleaned_null_lat_long_df = null_lat_long_df.drop('latitude', 'longitude')

# Join cleaned_null_lat_long_df with df on country
mapped_country_df = cleaned_null_lat_long_df.join(df, cleaned_null_lat_long_df.affiliation_country == df.name, how='left')

# Select the required columns
mapped_country_df = mapped_country_df.select('affiliation_name', 'affiliation_city', 'affiliation_country', 'latitude', 'longitude')

# Show the mapped DataFrame
mapped_country_df.show()

+--------------------+--------------------+-------------------+----------+-----------+
|    affiliation_name|    affiliation_city|affiliation_country|  latitude|  longitude|
+--------------------+--------------------+-------------------+----------+-----------+
| Universität zu Köln|                Koln|            Germany| 51.165691|  10.451526|
|           KU Leuven|         3000 Leuven|            Belgium| 50.503887|   4.469936|
|NUS Yong Loo Lin ...|      Singapore City|          Singapore|  1.352083| 103.819836|
|Faculty of Medici...|             Clayton|          Australia|-25.274398| 133.775136|
|      Uniklinik Köln|                Koln|            Germany| 51.165691|  10.451526|
|Goethe-Universitä...|   Frankfurt am Main|            Germany| 51.165691|  10.451526|
|Universidad de la...|San Cristobal de ...|              Spain| 40.463667|   -3.74922|
|Università degli ...|               Padua|              Italy|  41.87194|   12.56738|
|Departement Cellu...|              Leuven|

In [38]:
# Union the DataFrames
final_df = filtered_mapped_df.union(mapped_country_df)

# Convert the final DataFrame to a Pandas DataFrame
final_pandas_df = final_df.toPandas()

# Save the Pandas DataFrame to a CSV file
final_pandas_df.to_csv('affil_location.csv', index=False)