In [None]:
# This notebook uses the topic model built in a previous step to extract the top 20 terms
# in each topic, and also extract the top 1000 and bottom 100 documents most (and least)
# representative in each topic, with metadata added to more easily identify the volumes
# referenced

In [2]:
%config Completer.use_jedi = False

In [3]:
from pyspark.sql import SparkSession

In [4]:
from pyspark.ml.feature import CountVectorizerModel
from pyspark.ml.clustering import LocalLDAModel

In [5]:
from pyspark.sql.functions import col, udf, expr, arrays_zip, concat_ws, asc, desc, size
from pyspark.sql.types import ArrayType, StringType, FloatType

In [6]:
import numpy as np
import pandas as pd

In [7]:
import os
os.makedirs('out', exist_ok=True)

In [8]:
spark = SparkSession.builder \
    .config("spark.executor.memory", "128G") \
    .config("spark.driver.memory", "128G") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.kryoserializer.buffer.max", "512m") \
    .config("spark.local.dir", "/htrc/temp") \
    .master("local[*]") \
    .getOrCreate()

In [9]:
topic_model = LocalLDAModel.load('topicmodel')
topic_model

LocalLDAModel: uid=LDA_5fcbf393faae, k=20, numFeatures=262144

In [10]:
topic_model.getK()

20

In [11]:
vocab_model = CountVectorizerModel.load('vocab')
vocab_model

CountVectorizerModel: uid=CountVectorizer_5c4241403ad4, vocabularySize=262144

In [12]:
vocab = vocab_model.vocabulary

In [13]:
topics = topic_model.describeTopics(maxTermsPerTopic=20)
topics

DataFrame[topic: int, termIndices: array<int>, termWeights: array<double>]

In [14]:
index_map_udf = udf(lambda arr: [vocab[i] for i in arr], ArrayType(StringType()))

In [15]:
topics_with_terms = topics.withColumn('topic_terms', index_map_udf(topics.termIndices)).drop('termIndices', 'termWeights').withColumn('topic_terms', concat_ws(', ', col('topic_terms')))
topics_with_terms

DataFrame[topic: int, topic_terms: string]

In [16]:
topics_with_terms.show(truncate=False)

+-----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|topic|topic_terms                                                                                                                                                                                            |
+-----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|0    |united__states, cfr, national, housing, defense, designate, service, con, transportation, alien, corporation, loan, labor, commerce, appropriation, military, federal__fund, relative, education, bank |
|1    |alaska, river, map, washington, california, united__states, university, oil, issn, lake, ice, basin, russi, new__york, island, sea, county, geology, usa, florida

In [15]:
topics_with_terms.toPandas().to_csv('out/topics_top20.csv', index=False)

In [18]:
hathifiles = spark.read.csv('/htrc/shared/hathifiles/hathi_full_20210301.txt', sep='\t', header=True) \
    .select(col('htid'), col('title'), col('author'), col('rights_date_used'), col('access'), col('rights'))
hathifiles

DataFrame[htid: string, title: string, author: string, rights_date_used: string, access: string, rights: string]

In [19]:
to_array_udf = udf(lambda v: v.toArray().tolist(), ArrayType(FloatType()))

In [20]:
result = spark.read.parquet('result') \
    .drop('tokens') \
    .withColumn('topicDistribution', to_array_udf(col('topicDistribution'))) \
    .withColumnRenamed('_c1', 'htid') \
    .withColumnRenamed('_c2', 'seq') \
    .join(hathifiles, 'htid', 'left')
result.show()

+------------+--------+--------------------+--------------------+--------------------+----------------+------+------+
|        htid|     seq|   topicDistribution|               title|              author|rights_date_used|access|rights|
+------------+--------+--------------------+--------------------+--------------------+----------------+------+------+
|chi.14822710|00000197|[1.7056777E-4, 1....|The virus cancer ...|National Cancer I...|            1973| allow|    pd|
|chi.14822710|00000360|[1.8432748E-4, 1....|The virus cancer ...|National Cancer I...|            1973| allow|    pd|
|chi.14822710|00000236|[1.3893539E-4, 1....|The virus cancer ...|National Cancer I...|            1973| allow|    pd|
|chi.14822710|00000075|[3.7791772E-4, 3....|The virus cancer ...|National Cancer I...|            1973| allow|    pd|
|chi.14822710|00000050|[1.9619796E-4, 1....|The virus cancer ...|National Cancer I...|            1973| allow|    pd|
|chi.14822710|00000053|[1.513599E-4, 1.4...|The virus ca

In [26]:
result.filter(col('rights_date_used') == "9999").show()

+-------------------+--------+--------------------+--------------------+--------------------+----------------+------+------+
|               htid|     seq|   topicDistribution|               title|              author|rights_date_used|access|rights|
+-------------------+--------+--------------------+--------------------+--------------------+----------------+------+------+
|       uc1.b5131420|00000046|[2.457953E-4, 2.4...|Endangered Specie...|United States. Co...|            9999| allow|    pd|
|       uc1.b5131420|00000073|[1.5442881E-4, 0....|Endangered Specie...|United States. Co...|            9999| allow|    pd|
|       uc1.b5131420|00000069|[1.5927281E-4, 0....|Endangered Specie...|United States. Co...|            9999| allow|    pd|
|       uc1.b5131420|00000392|[1.7120679E-4, 1....|Endangered Specie...|United States. Co...|            9999| allow|    pd|
|       uc1.b5131420|00000436|[2.1873614E-4, 2....|Endangered Specie...|United States. Co...|            9999| allow|    pd|


In [23]:
result.groupBy('rights_date_used').count().sort(col('rights_date_used'), ascending=False).show(truncate=False)

+----------------+------+
|rights_date_used|count |
+----------------+------+
|9999            |11025 |
|2025            |4     |
|2018            |227   |
|2017            |869   |
|2016            |1253  |
|2015            |2983  |
|2014            |12604 |
|2013            |35778 |
|2012            |53815 |
|2011            |77435 |
|2010            |109714|
|2009            |151913|
|2008            |137304|
|2007            |179568|
|2006            |169179|
|2005            |210332|
|2004            |231914|
|2003            |233355|
|2002            |242176|
|2001            |248203|
+----------------+------+
only showing top 20 rows



In [19]:
for t in range(0, topic_model.getK()):
    result = result.withColumn(f't{t}', col('topicDistribution').getItem(t))

In [20]:
result = result.drop('topicDistribution').toPandas()

In [21]:
for t in range(0, topic_model.getK()):
    print(f'Working on topic {t}...')
    top_docs_per_topic = result.sort_values(by=f't{t}', ascending=False)[['htid', 'seq', f't{t}', 'rights_date_used', 'title', 'author', 'access', 'rights']]
    docs_slice = pd.concat([top_docs_per_topic.head(1000), top_docs_per_topic.tail(100)])
    docs_slice.to_csv(f'out/t{t}.csv', index=False)

Working on topic 0...
Working on topic 1...
Working on topic 2...
Working on topic 3...
Working on topic 4...
Working on topic 5...
Working on topic 6...
Working on topic 7...
Working on topic 8...
Working on topic 9...
Working on topic 10...
Working on topic 11...
Working on topic 12...
Working on topic 13...
Working on topic 14...
Working on topic 15...
Working on topic 16...
Working on topic 17...
Working on topic 18...
Working on topic 19...
