In [1]:
import findspark
findspark.init()
import pyspark
import re
import json
import pysolr
import datetime
from pyspark.sql import SparkSession
from pyspark import SparkContext
import pyspark.sql
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql import SQLContext
import sys

In [2]:
spark = SparkSession.builder \
    .master('local[*]') \
    .appName("FreebasePeople") \
    .getOrCreate()

In [3]:
# input_file = "freebase-head-1000000"
# input_file = "freebase-head-10000000"
input_file = "freebase-head-100000000"
freebase = spark.sparkContext.textFile(input_file)

In [4]:
re_name = '(\/[gm]\..+\t<http:\/\/rdf\.freebase\.com\/ns\/type\.object\.name>\t\".*\"@en)'
re_alias = '(\/[gm]\..+\t<http:\/\/rdf\.freebase\.com\/ns\/common\.topic\.alias>\t\".*\"@en)'
re_birth = '(\/[gm]\..+\t<http:\/\/rdf\.freebase\.com\/ns\/people\.person\.date_of_birth>\t)'
re_death = '(\/[gm]\..+\t<http:\/\/rdf\.freebase\.com\/ns\/people\.deceased_person\.date_of_death>\t)'

In [None]:
people = freebase \
    .filter(lambda x: re.search(re_name,x) or re.search(re_alias,x) or re.search(re_birth,x) or re.search(re_death,x)) \
    .distinct() \
    .map(lambda x: re.sub('(http\:\/\/rdf.freebase.com\/ns\/)|(\^\^.*\.)|(\@.*\.)|\<|\>|\"',"",x)) \
    .map(lambda x: x.split('\t')) 

In [None]:
# people.take(20)

In [None]:
schema = StructType([StructField('subject', StringType(), True),
                     StructField('predicate', StringType(), True),
                     StructField('object', StringType(), True, metadata = {"maxlength":2048})])

In [None]:
names = spark.createDataFrame(people.filter(lambda x: "type.object.name" in x[1]), schema)
aliases = spark.createDataFrame(people.filter(lambda x: "common.topic.alias" in x[1]), schema)
births = spark.createDataFrame(people.filter(lambda x: "people.person.date_of_birth" in x[1]), schema)
deaths = spark.createDataFrame(people.filter(lambda x: "people.deceased_person.date_of_death" in x[1]), schema)

In [None]:
# names.show()

In [None]:
# pomenovanie tabuliek
names.registerTempTable("names")
aliases.registerTempTable("aliases")
births.registerTempTable("births")
deaths.registerTempTable("deaths")

In [6]:
sql_context = SQLContext(spark.sparkContext)

In [None]:
sql = sql_context.sql("""
select names.object as name, 
ifnull(aliases.object, '-') as alias,
ifnull(cast(births.object as date), (cast(deaths.object as date) - 100*365) ) as birth,
ifnull(cast(deaths.object as date), (cast(births.object as date) + 100*365) ) as death
from names
left join births on names.subject = births.subject
left join deaths on names.subject = deaths.subject
left join aliases on names.subject = aliases.subject
where births.object is not null or deaths.object is not null
""")

In [None]:
# sql.show(20)

In [None]:
# n = 10
# n = 100
n = 1000
# result_files = "filteredPeople_1"
# result_files = "filteredPeople_10"
result_files = "filteredPeople_100"

In [None]:
sql.repartition(n).write.format('com.databricks.spark.csv') \
    .option("mapreduce.fileoutputcommitter.marksuccessfuljobs","false") \
    .save(result_files, header = 'true')

In [None]:
# from pyspark.sql.functions import to_json, spark_partition_id, collect_list, col, struct

# sql.select(to_json(struct(*sql.columns)).alias("json"))\
#     .groupBy(spark_partition_id())\
#     .agg(collect_list("json").alias("json_list"))\
#     .select(col("json_list").cast("string"))\
#     .write.json("path_json")

In [None]:
from os import listdir
import os
from os.path import isfile, join

readPath = result_files
writePath = result_files + '_csv'
os.mkdir(writePath)

file_list = [f for f in listdir(readPath)]

for i in file_list:
    print(i)
    filename, file_extension = os.path.splitext(i)
    reg = '(\.[\w]+-[\w]+)|([\w]+-[\w]+)'
    result = re.match(reg, filename).group()
    if file_extension == '.csv':
        filename = filename.split('-')[1]
        os.rename(readPath + '/' + i, writePath + "/" + result + file_extension)

In [None]:
from whoosh.index import *
from whoosh.fields import *
from whoosh.analysis import *
from whoosh.support.charset import accent_map
import os
import csv
from os import listdir
from os.path import isfile, join

In [None]:
path = './filteredPeople_1_csv/part-00000.csv'
csv_file = open('goods.csv', 'r')  # csv file
data = csv.reader(csv_file)

In [None]:
schema = Schema(name = TEXT(stored=True),alias = TEXT(stored=True),birth = TEXT(stored=True),death = TEXT(stored=True))

if not os.path.exists("index"):
    os.mkdir("index")
ix = create_in("index", schema)

ix = open_dir("index")

# writer = ix.writer()
csv_file = open(path, 'r', encoding = 'utf8')  # csv file
data = csv.reader(csv_file)
for i in data:
    print(writer.add_document(
        name=i[0],
        alias=i[1],
        birth = i[2],
        death = i[3]
    ))
        
# for i in file_list:
# with open(path, 'r', encoding='utf-8') as file:
#     for line in file:
        
#         print(writer.add_document(content=line))

# writer.commit()

In [None]:
from whoosh.qparser import QueryParser
qp = QueryParser("name", schema = ix.schema)
q = qp.parse("Mayu Fukunoue")
print(q)
with ix.searcher() as s:
    results = s.search(q)
    print(s.search(q))
    for res in results:
        print(res)
        print('\n')

In [None]:
mysolr = pysolr.Solr('http://localhost:8983/solr/freebase_people_100/')

In [None]:
# mysolr.delete(q='*', commit=True)

In [None]:
# mysolr.add(file)

In [None]:
person_name_1 = 'Takahama'
person_name_2 = 'Louise'
query_1 = 'name:*' + person_name_1 + '*'
query_2 = 'name:*' + person_name_2 + '*'
# print(q)
result_1 = mysolr.search(query_1)
result_2 = mysolr.search(query_2)
list(result_1), list(result_2)

In [None]:
for line in result_1:
    person_1, alias_1, dob_1, dod_1 = str(line['name'][0]), str(line['alias'][0]), line['birth'][0], line['death'][0]
    print(person_1, alias_1, dob_1, dod_1)
    
for line in result_2:
    person_2, alias_2, dob_2, dod_2 = str(line['name'][0]), str(line['alias'][0]), line['birth'][0], line['death'][0]
    print(person_2, alias_2, dob_2, dod_2)


In [None]:
def getDifference(dob_1, dod_1, dob_2, dod_2):
    if dod_1 >= dob_2 and dob_1 <= dod_2:
        return True
    return False

In [None]:
meet = getDifference(dob_1, dod_1, dob_2, dod_2)
if meet:
    print("Osoby" + person_1 +" a " + person_2 + " sa mohli stretnut.")
else:
    print("Osoby" + person_1 +" a " + person_2 + " sa nemohli stretnut.")