# Python scripts to generate metadata for Europeana objects which are created by persons (dc:creator, DBpedia URIs)

In [None]:
%pip install SPARQLWrapper
%pip install pandas
%pip install numpy
%pip install fsspec
%pip install rdfpandas

In [1]:
import os
from pathlib import Path
from SPARQLWrapper import SPARQLWrapper, JSON, XML, TURTLE, N3, RDF, RDFXML, CSV, TSV, JSONLD, DIGEST
import pandas as pd
from random import randint
import numpy as np
import requests
from urllib import parse, error
import fsspec
from rdfpandas.graph import to_graph, to_dataframe
from rdflib import Graph, URIRef, Literal, BNode, Namespace
from rdflib.namespace import NamespaceManager,CSVW, DC, DCAT, DCTERMS, DOAP, FOAF, ODRL2, ORG, OWL, PROF, PROV, RDF, RDFS, SDO, SH, SKOS, SOSA, SSN, TIME, VOID, XMLNS, XSD

# For multithreading
#import concurrent.futures
from concurrent import futures
import threading
# For time recording
import time
import datetime
from datetime import timedelta

In [2]:
# Mount Google Drive
#from google.colab import drive, files
#drive.mount('/content/drive')

# Specify file location (Google Colab and local)
filelocation_google = '/content/drive/MyDrive/Colab Notebooks/InTaVia/'
filelocation_local = ''

In [3]:
!lscpu

Architecture:            arm64
Byte Order:              Little Endian
Total CPU(s):            8
Model name:              MacBookPro18,3


## Standalone Test for 2nd query for 1 record (Method 4: shortcut version without SPARQL i.e. HTTP request without RDFlib)
API does not return the same data as SPARQL (e.g. DBpedia link is gone, Europeana Agent node is returned etc)

In [None]:
# Specify any Europeana URIs (i.e. http://data.europeana.eu/item/ or http://data.europeana.eu/proxy/europeana/)
europeanaURI = 'http://data.europeana.eu/item/2023859/_http___keptar_oszk_hu_025900_025984__'
europeanaURI = 'http://data.europeana.eu/item/15508/3710'
#europeanaURI = 'http://data.europeana.eu/proxy/europeana/2020903/KMS1811'
#europeanaURI = 'http://data.europeana.eu/proxy/provider/2032004/20270'
#wikidataURI = 'http://www.wikidata.org/entity/Q699736'
#dbpediaURI = 'http://dbpedia.org/resource/Zac_Posen'
#europeanaURI = 'http://data.europeana.eu/proxy/europeana/2026116/Partage_Plus_ProvidedCHO_Bildarchiv_Foto_Marburg_obj_20184057_LA_5_957_15a'
#dbpediaURI = 'http://dbpedia.org/resource/Gustav_Klimt'
headers = {
    'Accept': 'text/turtle',
    'Content-type': 'text/turtle'
}

def europeana_http_request(europeanaURI, headers):

  # try 3 times
  #r = requests.get(europeanaURI, headers=headers, timeout=3)
  # try:
  #   print(r.raise_for_status())
  #   return(r.text)
  # except requests.exceptions.HTTPError as e: 
  #   print(e)
  #   return

  # Try 3 times. In case of error, print it with one of the 4 types
  try:
    r = requests.get(europeanaURI, headers=headers, timeout=3)
    print(r.raise_for_status())
    return(r.text)
  except requests.exceptions.HTTPError as error_h:
      print("Http Error: ", error_h)
      #return error_h
  except requests.exceptions.ConnectionError as error_c:
      print("Connection Error: ", error_c)
      #return error_c
  except requests.exceptions.Timeout as error_t:
      print("Timeout Error: ", error_t)
      #return error_t
  except requests.exceptions.RequestException as error_o:
      print("All Other Error: ", error_o)
      #return error_o
      return
turtle = europeana_http_request(europeanaURI, headers)
print(turtle)

**Create a list of files in the CSV folder**

In [12]:
# folder path to CSV files
dir_path = 'EuropeanaObjectURIlist/Contributor'
# Create an empty list
list_file = []
# Iterate in the folder
for path in os.listdir(dir_path):
    # check if current path is a file
    if os.path.isfile(os.path.join(dir_path, path)):
        list_file.append(path)
list_file.sort()
print(list_file)


['OnlyInTaVia_at_nl_fi_si_URI_list_df_WikidataContributor_0.csv', 'OnlyInTaVia_at_nl_fi_si_URI_list_df_WikidataContributor_10000.csv', 'OnlyInTaVia_at_nl_fi_si_URI_list_df_WikidataContributor_100000.csv', 'OnlyInTaVia_at_nl_fi_si_URI_list_df_WikidataContributor_110000.csv', 'OnlyInTaVia_at_nl_fi_si_URI_list_df_WikidataContributor_120000.csv', 'OnlyInTaVia_at_nl_fi_si_URI_list_df_WikidataContributor_130000.csv', 'OnlyInTaVia_at_nl_fi_si_URI_list_df_WikidataContributor_140000.csv', 'OnlyInTaVia_at_nl_fi_si_URI_list_df_WikidataContributor_150000.csv', 'OnlyInTaVia_at_nl_fi_si_URI_list_df_WikidataContributor_160000.csv', 'OnlyInTaVia_at_nl_fi_si_URI_list_df_WikidataContributor_170000.csv', 'OnlyInTaVia_at_nl_fi_si_URI_list_df_WikidataContributor_180000.csv', 'OnlyInTaVia_at_nl_fi_si_URI_list_df_WikidataContributor_190000.csv', 'OnlyInTaVia_at_nl_fi_si_URI_list_df_WikidataContributor_20000.csv', 'OnlyInTaVia_at_nl_fi_si_URI_list_df_WikidataContributor_200000.csv', 'OnlyInTaVia_at_nl_fi_si_U

In [6]:
# Create log file for Python console output

# import sys
# import logging

# logging.basicConfig(
#     format='%(asctime)s [%(levelname)s] %(name)s - %(message)s',
#     level=logging.INFO,
#     datefmt='%Y-%m-%d %H:%M:%S',
#     #stream=sys.stdout,
#     filename="notebook.log",
# 	filemode='w') 
# log = logging.getLogger('notebook')


# logger = logging.getLogger()
# fhandler = logging.FileHandler(filename='test_log.log', mode='a')
# logger.addHandler(fhandler)
# logging.warning('This is a warning message')

import logging
logging.basicConfig(filename='logs.log', level=logging.INFO)
logging.info(list_file)

## Multi-threading 

**Multi-threading testing for 8 image download**

In [None]:
import requests
import time
#import concurrent.futures
from concurrent import futures
import threading

img_urls = [
    'https://images.unsplash.com/photo-1516117172878-fd2c41f4a759',
    'https://images.unsplash.com/photo-1524429656589-6633a470097c',
    'https://images.unsplash.com/photo-1516972810927-80185027ca84',
    'https://images.unsplash.com/photo-1516117172878-fd2c41f4a759',
    'https://images.unsplash.com/photo-1550439062-609e1531270e',
    'https://images.unsplash.com/photo-1532009324734-20a7a5813719',
    'https://images.unsplash.com/photo-1516972810927-80185027ca84',
    'https://images.unsplash.com/photo-1550439062-609e1531270e',
    'https://images.unsplash.com/photo-1516117172878-fd2c41f4a759',
    'https://images.unsplash.com/photo-1516972810927-80185027ca84',
]

def download_image(img_url):
    img_bytes = requests.get(img_url).content
    img_name = img_url.split('/')[3]
    img_name = f'{img_name}.jpg'
    with open(img_name, 'wb') as img_file:
        img_file.write(img_bytes)
        print(f'{img_name} was downloaded...')

t1 = time.perf_counter()
now1 = datetime.datetime.now()
#Initiate the threads
ex = futures.ThreadPoolExecutor(max_workers=35)
print('{}: is starting work'.format(threading.current_thread().getName()))
#Start the threads with the map method
results = ex.map(download_image, img_urls)
#print('{}: is waiting for the results'.format(threading.current_thread().getName()))
real_results = list(results)
print('main: results: {}'.format(real_results))
t2 = time.perf_counter()
now2 = datetime.datetime.now()
duration_seconds = round(t2 - t1, 3)
duration = str(timedelta(seconds=duration_seconds))
n = '\n'
print(f'THREADING Finished in {duration_seconds} seconds')
print(f'THREADING {n} Start time  {now1.strftime("%Y-%m-%d %H:%M:%S")} {n} Finish time {now2.strftime("%Y-%m-%d %H:%M:%S")} {n} Duration: {duration} = {duration_seconds} seconds')
# Write ('a': Add text. I.e. preserve existing texts)
with open('log_threading.txt', 'a') as f:
    f.write(f'THREADING: {n} Start time  {now1.strftime("%Y-%m-%d %H:%M:%S")} {n} Finish time {now2.strftime("%Y-%m-%d %H:%M:%S")} {n} Duration: {duration} = {duration_seconds} seconds')

#Compare it to a serial run
t1 = time.perf_counter()
now1 = datetime.datetime.now()
for img_url in img_urls:
    download_image(img_url)
t2 = time.perf_counter()
now2 = datetime.datetime.now()
duration_seconds = round(t2 - t1, 3)
duration = str(timedelta(seconds=duration_seconds))

print(f'SERIAL Finished in {duration_seconds} seconds')
print(f'SERIAL {n} Start time  {now1.strftime("%Y-%m-%d %H:%M:%S")} {n} Finish time {now2.strftime("%Y-%m-%d %H:%M:%S")} {n} Duration: {duration} = {duration_seconds} seconds')
# Write ('a': Add text. I.e. preserve existing texts)
with open('log_serial.txt', 'a') as f:
    f.write(f'SERIAL {n} Start time  {now1.strftime("%Y-%m-%d %H:%M:%S")} {n} Finish time {now2.strftime("%Y-%m-%d %H:%M:%S")} {n} Duration: {duration} = {duration_seconds} seconds')

# Output result:
#THREADING: Finished in 1.9588102889999846 seconds
#SERIAL: Finished in 2.8489685149999104 seconds

### Setup to save a log file

**MULTI-THREADING New way with HTTP request: 2nd Queries to fetch CH object metadata (Caution long processing time!)**

In [None]:
# Test with 1 iteration
#n4 = np.array([770000])
#n4 = np.array(list_file[-2:])
#print(n4)
#list_file = list_file[1:4]
print(list_file)

def multiple_files_download(list_file):
  i = 0
  for n in list_file:
    print(f'Processing {str(n)} started as no {str(i)}')
    #URI_list_df  = pd.read_csv(f'{filelocation_google}/EuropeanaObjectURIlist/Creator/{n}')
    URI_list_df  = pd.read_csv(f'EuropeanaObjectURIlist/Contributor/{n}')    
    list_Europeana_proxy = (URI_list_df['Europeana_proxy']).to_list()
    #list_Wikidata_URI = (URI_list_df['Wikidata_URI']).to_list()
    #list_Europeana_proxy = list_Europeana_proxy[0:10]
    #list_Wikidata_URI = list_Wikidata_URI[0:10]
    #print(list_Europeana_proxy)
    # Create directory for each iterated file
    #folder_path = os.path.join(f'{filelocation_google}/EuropeanaObjectTurtle/test/', str(n))
    folder_path = os.path.join(f'EuropeanaObjectTurtle/test/Contributor/', str(n))
    os.mkdir(folder_path)
    
    #df = pd.DataFrame(columns=['Europeana URI', 'Success', 'Info'])

    def singlefile_download(item):
      print(f'Processing {str(item)} started')
      #print(var2)
      #print(list_DBpedia_URI[i])
      headers = {
        'Accept': 'text/turtle',
        'Content-type': 'text/turtle'
      }
      turtle_europeana = europeana_http_request(item, headers)
      # Check if turtle file was fetched
      if turtle_europeana != None:
        filename = item.replace('/', '_')
        #filename = item.rsplit('/', 1)[1]
        print(f'{filename} is fetched')
        path_to_file = f'{folder_path}/{str(filename)}.ttl'
        path = Path(path_to_file)
        # Check if the (same) turtle file was already saved 
        if path.is_file():
          print(f'The file {path_to_file} already exists, thus no need to save and download to local')
        else:
          # Download ttl to GoogleDrive
          with open(path_to_file, 'w') as f:
            f.write(turtle_europeana)
          #success = 1
          #message = ''
          # Download ttl to local machine
          #files.download(filelocation_google + 'EuropeanaObjectTurtle/test/' + str(filename) + '.ttl')
          print(f'The file {path_to_file} does not exist, thus saved and download to local')
      else:
        pass
        #success = 0
        #message = str(turtle_europeana)
        print(f'No Turtle can be fetched from {str(item)}')

      #dicts = {'Europeana URI': path_to_file, 'Success': success, 'Info': message}
      #df_dicts = pd.DataFrame([dicts])
      #df = pd.concat([df, df_dicts], ignore_index=True)
      
      print(f'Processing {str(item)} completed')
      #Time interval for a next query/iteration
      #time.sleep(2)
      print('-----------------')


    # Time recording
    t1 = time.perf_counter()
    now1 = datetime.datetime.now()
    print('==========================================')
    print( f'{str(n)} is started {now1.strftime("%Y-%m-%d %H:%M:%S")}')
    print('==========================================')
    #Initiate max 40 threads (i.e. workers) 
    ex = futures.ThreadPoolExecutor(max_workers=2)
    #print('{}: is starting work'.format(threading.current_thread().getName()))
    #Start the threads with the map method
    results = ex.map(singlefile_download, list_Europeana_proxy)
    #print('{}: is waiting for the results'.format(threading.current_thread().getName()))
    real_results = list(results)
    #print('main: results: {}'.format(real_results))

    t2 = time.perf_counter()
    now2 = datetime.datetime.now()
    duration_seconds = t2 - t1
    duration = str(timedelta(seconds=duration_seconds))

    nl = '\n'
    # Save the time recording log
    with open(f'EuropeanaObjectTurtle/test/Contributor/' + str(n) + '_log_threading.txt', 'w') as f:
      # print(f.write(inputfile + ' is completed in ' + duration + ' at ' +  now2.strftime("%Y-%m-%d %H:%M:%S")) + '\n')
      print(f.write(f'{str(n)}: {nl} Start time  {now1.strftime("%Y-%m-%d %H:%M:%S")} {nl} Finish time {now2.strftime("%Y-%m-%d %H:%M:%S")} {nl} Duration: {duration} = {duration_seconds} seconds {nl} ------- {nl}'))

    print('==========================================')
    #print(file + ' is completed in ' + duration + ' at ' +  now2.strftime("%Y-%m-%d %H:%M:%S"))
    print(f'{str(n)}: {nl} Start time  {now1.strftime("%Y-%m-%d %H:%M:%S")} {nl} Finish time {now2.strftime("%Y-%m-%d %H:%M:%S")} {nl} Duration: {duration} = {duration_seconds} seconds')
    print('==========================================')


    # #### Multithreading below
    # t1 = time.perf_counter()
    # now1 = datetime.datetime.now()
    # #Initiate max 40 threads (i.e. workers) 
    # ex = futures.ThreadPoolExecutor(max_workers=2)
    # #ex = futures.ThreadPoolExecutor()
    # print('{}: is starting work'.format(threading.current_thread().getName()))
    # #Start the threads with the map method
    # results = ex.map(singlefile_download, list_Europeana_proxy)
    # #print('{}: is waiting for the results'.format(threading.current_thread().getName()))
    # real_results = list(results)
    # print('main: results: {}'.format(real_results))
    # t2 = time.perf_counter()
    # now2 = datetime.datetime.now()
    # duration_seconds = round(t2 - t1)
    # duration = str(timedelta(seconds=duration_seconds))
    # nl = '\n'
    # # Save the time recording log
    # with open(f'EuropeanaObjectTurtle/test/Creator/' + str(n) + '_log_threading.txt', 'w') as f:
    #     f.write(f'THREADING: {nl} Start time  {now1.strftime("%Y-%m-%d %H:%M:%S")} {nl} Finish time {now2.strftime("%Y-%m-%d %H:%M:%S")} {nl} Duration: {duration} = {duration_seconds} seconds')
    # print(f'========================================================')
    # print(f'THREADING {nl} Start time  {now1.strftime("%Y-%m-%d %H:%M:%S")} {nl} Finish time {now2.strftime("%Y-%m-%d %H:%M:%S")} {nl} Duration: {duration} = {duration_seconds} seconds')
    # print(f'========================================================')

    # Idea to store results of metadata fetching in Dataframe per CSV file of Europeana URI list, but not successful so far
    #df_csv = df.to_csv(f'{str(n)}.csv', index = False)
    # Reset the dataframe for the next iteration
    #df = df[0:0]
    
    # #Compare it to a serial run
    # t1 = time.perf_counter()
    # now1 = datetime.datetime.now()
    # for item in list_Europeana_proxy:
    #   singlefile_download(item)        
    # t2 = time.perf_counter()
    # now2 = datetime.datetime.now()
    # duration_seconds = round(t2 - t1, 3)
    # duration = str(timedelta(seconds=duration_seconds))
    # # Save the time recording log
    # with open(f'{folder_path}/log_serial.txt', 'w') as f:
    #     f.write(f'SERIAL {nl} Start time  {now1.strftime("%Y-%m-%d %H:%M:%S")} {nl} Finish time {now2.strftime("%Y-%m-%d %H:%M:%S")} {nl} Duration: {duration} = {duration_seconds} seconds')
    # print(f'========================================================')
    # print(f'SERIAL {nl} Start time  {now1.strftime("%Y-%m-%d %H:%M:%S")} {nl} Finish time {now2.strftime("%Y-%m-%d %H:%M:%S")} {nl} Duration: {duration} = {duration_seconds} seconds')
    # print(f'========================================================')

    # # Save the time recording log
    # with open(f'{folder_path}/log.txt', 'w') as f:
    #     f.write(f'{str(n)} {nl} Start time  {now1.strftime("%Y-%m-%d %H:%M:%S")} {nl} Finish time {now2.strftime("%Y-%m-%d %H:%M:%S")} {nl} Duration: {duration} = {duration_seconds} seconds')
    # print(f'========================================================')
    # print(f'{str(n)} {nl} Completed = {str(i +1)} files processed {nl} Start time  {now1.strftime("%Y-%m-%d %H:%M:%S")} {nl} Finish time {now2.strftime("%Y-%m-%d %H:%M:%S")} {nl} Duration: {duration} = {duration_seconds} seconds')
    # print(f'========================================================')

    i = i + 1

multiple_files_download(list_file)

# Peformance benchmark CPU
# 2 iterations with no sleep
# 4 Threading 2.72, 2.65 seconds
# Serial process 8.02, 7.56 seconds
#
# 2 iterations with 1 second sleep
# 4 Threading 6.78, 5.29 seconds
# Serial process 18.30, 18.00 seconds
#
# 2 iterations with 2 seconds sleep
# 4 Threading 8.70, 8.31 seconds
# Serial process 28.47, 27.92 seconds

# Peformance benchmark GPU
# 2 iterations with no sleep
# 4 Threading 3.66, 3.38 seconds
# Serial process 11.47, 10.85 seconds
#
# 2 iterations with 1 second sleep
# 4 Threading 6.48, 6.55 seconds 
# Serial process 21.75, 20.34 seconds 
#
# 2 iterations with 2 seconds sleep
# 4 Threading 9.93, 9.16 seconds
# Serial process 31.74, 30.87 seconds

# Peformance benchmark TPU
# 2 iterations with no sleep
# 4 Threading 2.98, 2.22 seconds
# Serial process 9.09, 8.15 seconds
#
# 2 iterations with 1 second sleep
# 4 Threading 5.53, 5.27 seconds 
# Serial process 18.40, 17.32 seconds 
#
# 2 iterations with 2 seconds sleep
# 4 Threading 8.85, 8.16 seconds
# Serial process 28.27, 29.33 seconds


### Calculated the number of downloaded files

In [3]:
# folder path to CSV files
dir_path = 'EuropeanaObjectTurtle/test/Creator/'
# Create an empty list
list_file = []
list_sub_file = []
number_file_in_folder = []
# Iterate in the folder
for path in os.listdir(dir_path):
    # check if current path is a folder
    if os.path.isdir(os.path.join(dir_path, path)):
        # Iterate in the subfolder
        sub_dir_path = path
        #print(dir_path + sub_dir_path)
        
        for sub_path in os.listdir(dir_path + sub_dir_path):
            #print(sub_path)    
            #check if current path is a file
            if os.path.isfile(os.path.join(dir_path, sub_dir_path, sub_path)):
                list_sub_file.append(sub_path)
            else:
                pass
        
        list_file.append(list_sub_file)

        # length = len(list_sub_file)
        # number_file_in_folder.append(length)    
        # n = n + 1
    else:
        pass

#list_file.sort()
print(list_file)
print('--------')
print(number_file_in_folder)
print('--------')
#list_sub_file.sort()
print(len(number_file_in_folder))




In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
while True:pass