### **Spark**

In [1]:
!pip install -U pyspark



In [2]:
import pandas as pd
import os

pyspark_submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "")
if not "pyspark-shell" in pyspark_submit_args: pyspark_submit_args += " pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args
spark_home = os.environ.get("SPARK_HOME")

In [3]:
from pyspark.conf import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext

# sc.stop()
sc = SparkContext() 
config = sc.getConf()
config.set('spark.cores.max','4')
config.set('spark.executor.memory', '8G')
config.set('spark.driver.maxResultSize', '8g')
config.set('spark.kryoserializer.buffer.max', '512m')
config.set("spark.driver.cores", "4")

sc.stop()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/04/28 19:50:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
sc = SparkContext(conf = config) 
sqlContext = SQLContext(sc)
print("Using Apache Spark Version", sc.version)

Using Apache Spark Version 3.4.0




#### **Read CSV file into Spark DataFrame**

In [5]:
cb_file = "metadata.csv"

In [6]:
sdf = sqlContext.read.format("csv") \
                        .options(header='true', inferschema='true', treatEmptyValuesAsNulls='true') \
                        .load(cb_file)
sdf.count()

                                                                                

1056660

In [7]:
selected_columns = [sdf.columns[i-1] for i in [1, 3, 4, 9, 10, 11, 12, 18]]
sdf_filtered = sdf.select(selected_columns)

In [8]:
sdf_filtered.show(10)

+--------+--------+--------------------+--------------------+------------+--------------------+--------------------+--------------------+
|cord_uid|source_x|               title|            abstract|publish_time|             authors|             journal|                 url|
+--------+--------+--------------------+--------------------+------------+--------------------+--------------------+--------------------+
|ug7v899j|     PMC|Clinical features...|OBJECTIVE: This r...|  2001-07-04|Madani, Tariq A; ...|      BMC Infect Dis|https://www.ncbi....|
|02tnwd4m|     PMC|Nitric oxide: a p...|Inflammatory dise...|  2000-08-15|Vliet, Albert van...|          Respir Res|https://www.ncbi....|
|ejv2xln0|     PMC|Surfactant protei...|Surfactant protei...|  2000-08-25|     Crouch, Erika C|          Respir Res|https://www.ncbi....|
|2b73a28n|     PMC|Role of endotheli...|Endothelin-1 (ET-...|  2001-02-22|Fagan, Karen A; M...|          Respir Res|https://www.ncbi....|
|9785vg6d|     PMC|Gene expression

In [9]:
sdf_clean = sdf_filtered.na.drop()

In [10]:
sdf_clean.count()

                                                                                

509210

In [11]:
# check data schema
sdf_clean.printSchema()

root
 |-- cord_uid: string (nullable = true)
 |-- source_x: string (nullable = true)
 |-- title: string (nullable = true)
 |-- abstract: string (nullable = true)
 |-- publish_time: string (nullable = true)
 |-- authors: string (nullable = true)
 |-- journal: string (nullable = true)
 |-- url: string (nullable = true)



In [18]:
sdf_clean.write.json("metadata_clean.json")

                                                                                

In [20]:
# spark.stop()

### **MongoDB**

In [12]:
from pymongo import MongoClient

# connect to mongoDB
client = MongoClient('localhost',27017)
db = client.project
collection = db.metadata

In [13]:
import json
import glob

json_files = glob.glob("metadata_clean.json/part-*.json")

# Read and insert data from each JSON file
for json_file in json_files:
    with open(json_file, "r") as file:
        data = [json.loads(line) for line in file]
        collection.insert_many(data)

In [14]:
collection.count_documents({})

1018420

In [16]:
# collection.drop()

#### **By Title Function**

In [38]:
def details(title):
    # Search for records containing the input string in the title field
    query_title = {"title": {"$regex": title, "$options": "i"}}
    cursor_title = collection.find(query_title)

    # Convert the cursor to a Pandas DataFrame
    records_title = list(cursor_title)
    df_title = pd.DataFrame(records_title)

    # Filter the DataFrame to include only the desired columns
    columns_title = ["cord_uid", "source_x", "title", "abstract", "publish_time", "authors", "journal"]
    df_title = df_title[columns_title]

    return df_title

In [39]:
result_df = details("covid")
print(result_df)

        cord_uid source_x                                              title  \
0       fyorl3ks      WHO  Sports Students' Satisfaction with Their Forei...   
1       cogrn31y      WHO  Minor Clinical Impact of COVID-19 Pandemic on ...   
2       kobwu7vg      WHO  COVCOG 1: Factors predicting Cognitive Symptom...   
3       qgbtjyj2    ArXiv  C-Watcher: A Framework for Early Detection of ...   
4       pfgyz697      WHO  The COVID-19 Infodemic: A Quantitative Analysi...   
...          ...      ...                                                ...   
174179  sbaucmn6      WHO  Addressing the global surge of COVID-19 cases:...   
174180  uywwa5zf      WHO  [Practice of urological departments during the...   
174181  rof1c4va      WHO  Maladie de Kawasaki de l'adulte post-COVID 19 ...   
174182  4bowti05      WHO  IoT role in prevention of COVID-19 and health ...   
174183  4ct2vb3e      WHO  Clinical and radiologic features of the first ...   

                                       

#### **By Author Function**

In [52]:
def details(authors):
    # Search for records containing the input string in the title field
    query_authors = {"authors": {"$regex": authors, "$options": "i"}}
    cursor_authors = collection.find(query_authors)

    # Convert the cursor to a Pandas DataFrame
    records_authors = list(cursor_authors)
    df_authors = pd.DataFrame(records_authors)

    # Filter the DataFrame to include only the desired columns
    columns_authors = ["cord_uid", "source_x", "title", "abstract", "publish_time", "authors", "journal"]
    df_authors = df_authors[columns_authors]

    return df_authors

In [53]:
df_authors = details("Sam")
print(df_authors)

       cord_uid source_x                                              title  \
0      nek2ctqy      WHO               Digital Contact Tracing for Covid 19   
1      vwd6tho6    ArXiv               Digital Contact Tracing for Covid 19   
2      vt48ah4r      WHO  Teledentistry Protocol for Patient Assistance ...   
3      937303jy      WHO  Organizational challenges and oncological acti...   
4      7zje6wq1      WHO  [Diagnosztikus lépések és a betegség prognózis...   
...         ...      ...                                                ...   
20724  g4h3zi8l      WHO  "Impact of the COVID-19 pandemic on orthopedic...   
20725  g4h3zi8l      WHO  "Impact of the COVID-19 pandemic on orthopedic...   
20726  ejy6v0uz    ArXiv  TURINGBENCH: A Benchmark Environment for Turin...   
20727  t612lxb1      WHO  Longitudinal and cross-sectional detection of ...   
20728  6twxq5k0      WHO  Pay to skip the line: The political economy of...   

                                                abs

#### **By Source Function**

In [91]:
def details(source):
    # Search for records containing the input string in the title field
    query_source = {"source_x": {"$regex": source, "$options": "i"}}
    cursor_source = collection.find(query_source)

    # Convert the cursor to a Pandas DataFrame
    records_source = list(cursor_source)
    df_source = pd.DataFrame(records_source)

    # Filter the DataFrame to include only the desired columns
    columns_source = ["cord_uid", "source_x", "title", "abstract", "publish_time", "authors", "journal"]
    df_source = df_source[columns_source]

    return df_source

#### **By Journal Function**

In [92]:
def details(journal):
    # Search for records containing the input string in the title field
    query_journal = {"journal": {"$regex": journal, "$options": "i"}}
    cursor_journal = collection.find(query_journal)

    # Convert the cursor to a Pandas DataFrame
    records_journal = list(cursor_journal)
    df_journal = pd.DataFrame(records_journal)

    # Filter the DataFrame to include only the desired columns
    columns_journal = ["cord_uid", "source_x", "title", "abstract", "publish_time", "authors", "journal"]
    df_journal = df_journal[columns_journal]

    return df_journal

In [93]:
df_journal = details("BMC")
print(df_journal)

      cord_uid           source_x  \
0     1nl29xrg  Medline; PMC; WHO   
1     7shw6xht       Medline; PMC   
2     vl1gadl9       Medline; PMC   
3     pqbc0dug       Medline; PMC   
4     ih09zi80       Medline; PMC   
...        ...                ...   
9289  s9iugbtv            Medline   
9290  iwhcul3u            Medline   
9291  8ojypu4m            Medline   
9292  saol972y            Medline   
9293  20x021ym            Medline   

                                                  title  \
0     Toward finding the difference between untreate...   
1     COVID-19 infection control measures and outcom...   
2     Effective public health measures to mitigate t...   
3     The impact of the COVID-19 pandemic on final y...   
4     The experiences, needs and barriers of people ...   
...                                                 ...   
9289  Reliability, validity and psychometric propert...   
9290  The link between thyroid autoimmunity (antithy...   
9291  The safety and eff

### **Flask**

In [15]:
from flask import Flask, request, jsonify, render_template
import pandas as pd
from pymongo import MongoClient

app = Flask(__name__)

client = MongoClient('localhost', 27017)
db = client.project
collection = db.metadata

def details(query_field, query_value):
    query = {query_field: {"$regex": query_value, "$options": "i"}}
    cursor = collection.find(query)
    records = list(cursor)
    df = pd.DataFrame(records)

    columns = ["cord_uid", "source_x", "title", "abstract", "publish_time", "authors", "journal"]
    df = df[columns]

    return df

@app.route('/')
def index():
    return render_template('index.html')

@app.route('/search/title', methods=['GET'])
def search_title():
    title = request.args.get('title', '')
    df_title = details("title", title)
    return jsonify(df_title.to_dict(orient='records'))

@app.route('/search/authors', methods=['GET'])
def search_authors():
    authors = request.args.get('authors', '')
    df_authors = details("authors", authors)
    return jsonify(df_authors.to_dict(orient='records'))

@app.route('/search/source', methods=['GET'])
def search_source():
    source = request.args.get('source', '')
    df_source = details("source_x", source)
    return jsonify(df_source.to_dict(orient='records'))

@app.route('/search/journal', methods=['GET'])
def search_journal():
    journal = request.args.get('journal', '')
    df_journal = details("journal", journal)
    return jsonify(df_journal.to_dict(orient='records'))

if __name__ == '__main__':
    app.run(debug=True)


 * Serving Flask app "__main__" (lazy loading)
 * Environment: production
[2m   Use a production WSGI server instead.[0m
 * Debug mode: on


 * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
 * Restarting with watchdog (fsevents)
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.9/site-packages/ipykernel_launcher.py", line 15, in <module>
    from ipykernel import kernelapp as app
  File "/opt/anaconda3/lib/python3.9/site-packages/ipykernel/kernelapp.py", line 18, in <module>
    from IPython.core.application import (
  File "/opt/anaconda3/lib/python3.9/site-packages/IPython/__init__.py", line 56, in <module>
    from .terminal.embed import embed
  File "/opt/anaconda3/lib/python3.9/site-packages/IPython/terminal/embed.py", line 16, in <module>
    from IPython.terminal.interactiveshell import TerminalInteractiveShell
  File "/opt/anaconda3/lib/python3.9/site-packages/IPython/terminal/interactiveshell.py", line 35, in <module>
    from .debugger import TerminalPdb, Pdb
  File "/opt/anaconda3/lib/python3.9/site-packages/IPython/terminal/debugger.py", line 6, in <module>
    from IPython.core.co

SystemExit: 1

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
