Word analysis, by domain, on the Common Crawl data set for the purpose of finding industry trends
Switch branches/tags
Nothing to show
Clone or download
Latest commit 704a8ed Jul 16, 2018
Permalink
Failed to load latest commit information.
data Add files via upload Jul 16, 2018
snapshot Add files via upload Apr 25, 2017
README.md Added additional comments Feb 13, 2018
command line.txt Added additional comments Feb 13, 2018
export06.csv Add files via upload Jul 16, 2018

README.md

KeywordAnalysis

Word analysis, by domain, on the Common Crawl data set for the purpose of finding industry trends


Process

Specific Domain Data Capturing

Common Crawl NetApp data capturing (New Index - after 2013)

  1. Start one EC2 m4.xlarge instance with 30GB SSD volume
  2. SSH to the instance with user: ec2-user
sudo yum -y install git; 
git clone https://github.com/CI-Research/cdx-index-client
cd cdx-index-client
sudo pip install -r requirements.txt
./cdx-index-client.py -c CC-MAIN-2016-30 *.netapp.com --json
cat domain-* > CC-MAIN-2016-30_July_Netapp
cd ~
git clone https://github.com/CI-Research/CommonCrawlDocumentDownload
cd CommonCrawlDocumentDownload
sudo yum install java-1.8.0-openjdk-devel
sudo update-alternatives --config java
./gradlew check
cp ~/cdx-index-client/CC-MAIN-2016-30_July_Netapp ~/CommonCrawlDocumentDownload
mv CC-MAIN-2016-30_July_Netapp commoncrawl-CC-MAIN.txt
nohup ./gradlew downloadDocuments
  1. Uploade data to S3
cd ~/download
aws configure
aws s3 sync . s3://CommonCrawl/data/NetApp/CC-MAIN-2016-30_July_Netapp/

Wget NetApp data capturing (This is optional)

Run wget from laptop Linux virtual machine

wget -r -nc -np "http://www.netapp.com/"
FINISHED --2017-04-28 05:03:37--
Total wall clock time: 9h 14m 12s
Downloaded: 10255 files, 1.1G in 4h 51m 33s (67.6 KB/s)
after zip/unzip: 10,000 files, 1,429 folders, 1.11GB
zip -r NetApp_April_2017 www.netapp.com/
find . -type f -exec cat {} + > Netapp_April_2017.txt

Upload files to AWS S3 bucket for use.

Remove html tags

  1. Start 1 nodes AWS EMR (Advance config: Hadoop only, Network "EC2-Classic", Master "m1.large", Core "0")
  2. SSH to the instance: ec2-54-90-80-85.compute-1.amazonaws.com (change) user: hadoop
  3. sudo yum install -y git
  4. wget http://www-eu.apache.org/dist/maven/maven-3/3.5.2/binaries/apache-maven-3.5.2-bin.tar.gz
  5. tar zxvf apache-maven-3.5.2-bin.tar.gz
  6. sudo vi .bashrc
export MAVEN_HOME=/home/hadoop/apache-maven-3.5.2
export M2_HOME=/home/hadoop/apache-maven-3.5.2
export M2=/home/hadoop/apache-maven-3.5.2
export PATH=/home/hadoop/apache-maven-3.5.2/bin:$PATH
  1. source .bashrc
  2. git clone https://github.com/dkpro/dkpro-c4corpus
  3. aws s3 sync s3://CommonCrawl/data/NetApp/CC-MAIN-2016-30_July_Netapp/ /var/tmp/CC-MAIN-2016-30_July_Netapp 9a. aws s3 sync s3://CommonCrawl/data/StackOverflow/ /var/tmp/StackOverflow/
  4. run the dkpro-c4corpus-boilerplate project using the below 2 commands::
cd dkpro-c4corpus/dkpro-c4corpus-boilerplate/
mvn package
  1. mkdir /var/tmp/boiler
  2. Create script to process the file:: (replace name of the directory CC-MAIN to the one you are working on). For StackOverflow i used "/var/tmp/StackOverflow/*;"
vi boiler.sh
#!/bin/bash
for filename in /var/tmp/CC-MAIN*/*; do
    java -jar target/dkpro-c4corpus-boilerplate-1.0.1-SNAPSHOT.jar "$filename" "/var/tmp/boiler/$(basename "$filename" .txt)" false  
done
  1. chmod +x boiler.sh
  2. nohup ./boiler.sh
  3. cd /var/tmp/boiler
  4. aws s3 sync . s3://CommonCrawl/boilerplate/netapp/CC-MAIN-2016-30_July_Netapp/

Wordcount process

spark-shell

IBM Wordcount process:

val file = sc.textFile("s3://CommonCrawl/ibm_boiler")
val counts = file.flatMap(line => line.toLowerCase().replace(".", " ").replace(",", " ").split(" ")).map(word => (word, 1L)).reduceByKey(_ + _)
val sorted_counts = counts.collect().sortBy(wc => -wc._2)   // 1mins
sc.parallelize(sorted_counts.take(60000)).saveAsTextFile("s3://CommonCrawl/boilerplate/ibm_boiler _top60000")
sc.parallelize(sorted_counts).saveAsTextFile("s3://CommonCrawl/boilerplate/wordcount-ibm_bolier")

Netapp Wordcount process:

val file = sc.textFile("s3://CommonCrawl/boilerplate/netapp_boiler")
val counts = file.flatMap(line => line.toLowerCase().replace(".", " ").replace(",", " ").split(" ")).map(word => (word, 1L)).reduceByKey(_ + _)
val sorted_counts = counts.collect().sortBy(wc => -wc._2)    // 3mins
sc.parallelize(sorted_counts.take(20000)).saveAsTextFile("s3://CommonCrawl/top20000_netapp_boiler")
sc.parallelize(sorted_counts).saveAsTextFile("s3://CommonCrawl/wordcount-netapp_boiler")

Top 10 words

Word Count
4327791
the 2103578
0 1159355
to 1097568
and 1057336
a 856529
of 811647
for 737729
in 646580
ibm 623663

Dataframe, Dataset, Data source

Convert Text to Parquet, Spark 2.0 convert into parquet file in much more efficient than spark1.6.

[hadoop@ip-10-0-1-27 ~]$ aws s3 cp s3://CommonCrawl/netapp_boiler_top20000_np.csv /var/tmp
[hadoop@ip-10-0-1-27 ~]$ hdfs dfs -put /var/tmp/netapp_boiler_top20000_np.csv /user/hadoop/

Spark 1.4+

spark-shell --packages com.databricks:spark-csv_2.11:1.5.0
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("/user/hadoop/netapp_boiler_top20000_np.csv")
val selectedData = df.select("words", "count")
selectedData.write.format("com.databricks.spark.csv").option("header", "true").save("netappparquet.csv")

Remove Stop Words

The steps are

  1. remove punctuation, by replace "[^A-Za-z0-9\s]+" with "", or not include numbers "[^A-Za-z\s]+"
  2. trim all spaces
  3. lower all words
  4. remove stop words
aws s3 cp s3://CommonCrawl/netapp_boiler_top20000.txt /var/tmp
hdfs dfs -mkdir /user/hadoop/data/
hdfs dfs -put /var/tmp/netapp_boiler_top20000.txt /user/hadoop/data/
spark-shell
import org.apache.spark.ml.feature.StopWordsRemover
import org.apache.spark.sql.functions.split

// val reg = raw"[^A-Za-z0-9\s]+" // remove punctuation with numbers

val reg = raw"[^A-Za-z\s]+" // remove punctuation not include numbers
val lines = sc.textFile("/user/hadoop/netapp_boiler_top20000_np.csv").map(_.replaceAll(reg, "").trim.toLowerCase).toDF("line")
val words = lines.select(split($"line", " ").alias("words"))
val remover = new StopWordsRemover().setInputCol("words").setOutputCol("filtered")
val noStopWords = remover.transform(words)
remover.transform(words).show(15)

alt text


//val counts = noStopWords.select(explode($"filtered")).map(word =>(word, 1)).reduceByKey(_+_)

// from word -> num to num -> word
//val mostCommon = counts.map(p => (p._2, p._1)).sortByKey(false, 1)

//mostCommon.take(5)

//dataframe dump to csv
val stringify = udf((vs: Seq[String]) => s"""[${vs.mkString(",")}]""")
words.withColumn("words", stringify($"words")).write.csv("/netapp_filtered.csv")
hdfs dfs -get /netapp_filtered.csv .

Machine Learning Pipeline: TF-IDF and K-Means

Introducing the TF-IDF method for vectorizing a "bag of words"

TF: "Term Frequency"

  • normalized for the length of the document
  • hashed into a fixed-length set of buckets ("the hashing trick") so that we don't have an extremely high number of dimensions (count of all distinct tokens)
  • downside: there will be some hash collisions, where unrelated words get mapped to the same "dimension"

IDF: "Inverse Document Frequency"

  • Normalize word counts based on how frequently a word occurs in the corpus.
  • Logarithmic transformation so that words which occur in literally every document (100% or 1.0) get weighted down to 0 (ln 1)
  • Rare words are weighted heavily
  • Helpful where rare, technical vocabulary constitutes distinguishing features

In spark 2.0, Spark has made csv a built-in source. We can create Dataframes from csv file.

sudo yum install -y git

git clone https://github.com/phatak-dev/spark-two-migration

aws s3 cp s3://CommonCrawl/netapp_boiler_top20000_np.csv /var/tmp

hdfs dfs -put /var/tmp/netapp_boiler_top20000_np.csv /user/hadoop/

spark-shell

import org.apache.spark.ml.feature.StopWordsRemover
import org.apache.spark.sql.functions.split
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
val sqlContext = new SQLContext(sc)
val netappDF = sqlContext.read.format("csv").option("header", "true").load("netapp_boiler_top20000_np.csv")
netappDF.columns
netappDF.show(15)
netappDF.printSchema()
netappDF.select("count").show()
netappDF.select($"words", $"count").show()
netappDF.filter($"count" > 10000).show()
netappDF.groupBy("count").count().show()
netappDF.groupBy("words").count().show()
//try sql query to display specific word
//netappDF.createOrReplaceTempView("netappsql")
//val sqlDF = spark.sql("SELECT words, count FROM netappsql WHERE words = 'database'".show(20) 

Lower case the text:

val netappLoweredDF = netappDF.select($"*", lower($"words").as("lowerText"))
netappLoweredDF.show(2)

Set up the ML Pipeline:

import org.apache.spark.ml.feature.{RegexTokenizer, StopWordsRemover, HashingTF, IDF, Normalizer}

Step 1: Natural Language Processing: RegexTokenizer: Convert the lowerText col to a bag of words

val tokenizer = new RegexTokenizer().setInputCol("lowerText").setOutputCol("netappwords").setPattern("""\W+""")
val netappWordsDF = tokenizer.transform(netappLoweredDF.na.drop(Array("lowerText")))
netappWordsDF.printSchema
netappWordsDF.select("netappwords").first

Step 2: Natural Language Processing: StopWordsRemover: Remove Stop words

val remover = new StopWordsRemover().setInputCol("netappwords").setOutputCol("noStopWords")
val noStopWordsListDF = remover.transform(netappWordsDF)
noStopWordsListDF.printSchema
noStopWordsListDF.select("words", "count", "netappwords", "noStopWords").show(20)
noStopWordsListDF.show(15)

alt text

alt text

Step 3: HashingTF// More features = more complexity and computational time and accuracy

val hashingTF = new HashingTF().setInputCol("noStopWords").setOutputCol("hashingTF").setNumFeatures(20000)
val featurizedDataDF = hashingTF.transform(noStopWordsListDF)
featurizedDataDF.printSchema
featurizedDataDF.select("words", "count", "netappwords", "noStopWords").show(7)

Step 4: IDF// This will take 30 seconds or so to run

val idf = new IDF().setInputCol("hashingTF").setOutputCol("idf")
val idfModel = idf.fit(featurizedDataDF)

Step 5: Normalizer// A normalizer is a common operation for text classification. // It simply gets all of the data on the same scale... for example, if one article is much longer and another, it'll normalize the scales for the different features. // If we don't normalize, an article with more words would be weighted differently

val normalizer = new Normalizer().setInputCol("idf").setOutputCol("features")

Step 6: k-means & tie it all together...

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.clustering.KMeans
val kmeans = new KMeans().setFeaturesCol("features").setPredictionCol("prediction").setK(8).setSeed(0) 
// for reproducability
val pipeline = new Pipeline().setStages(Array(tokenizer, remover, hashingTF, idf, normalizer, kmeans))
// This can take more 1 hour to run!/*
val model = pipeline.fit(netappLoweredDF.na.drop(Array("lowerText")))
*/

Prediction

aws s3 cp s3://CommonCrawl/ibm_boiler_top60000.csv /var/tmp

hdfs dfs -put /var/tmp/ibm_boiler_top60000.csv /user/hadoop/

// val model2 = org.apache.spark.ml.PipelineModel.load("netapp_boiler_top20000_np.csv")
// val model2 = org.apache.spark.ml.PipelineModel.load("saves_parquet.csv")
// input path error

Let's take a look at a sample of the data to see if we can see a pattern between predicted clusters and titles.

val rawPredictionsDF = model.transform(netappLoweredDF.na.drop(Array("lowerText")))
rawPredictionsDF.columns
rawPredictionsDF.show(10)
val predictionsDF = rawPredictionsDF.select($"words", $"prediction").cache
predictionsDF.show(15)
// This could take up to 5 minutes.predictionsDF.groupBy("prediction").count().orderBy($"count" desc).show(100)
display(predictionsDF.filter("prediction = 3").select("words", "prediction").limit(30))
display(predictionsDF.filter("prediction = 4").select("words", "prediction").limit(30))
display(predictionsDF.filter("prediction = 2").select("words", "prediction").limit(30))
predictionsDF.filter($"title" === "Apache Spark").show(10)
display(predictionsDF.filter("prediction = 25").limit(25))

alt text

alt text

alt text