In [1]:
elastic_hosts = [] #elastic cluster instance credentials
parquet_file = 'CommitFile.parquet'

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from elasticsearch import Elasticsearch as es
import re

In [3]:
import urllib3
urllib3.disable_warnings()

In [4]:
hadoop_aws_ver = '3.3.2'

sparkConf = SparkConf() \
  .setAppName('simge-apollo') \
  .setMaster('') \ #spark master IP
  .set('spark.executor.heartbeatInterval', '6000s') \
  .set('spark.network.timeout', '7200s') \
  .set('spark.executor.memory', '2500m') \
  .set('spark.driver.memory', '512m') \
  .set('spark.driver.memoryOverhead', '512m') \

sc = SparkContext(conf=sparkConf)

spark = SparkSession(sc)

#spark = SparkSession \
#    .builder \
#    .appName("apollo-simge") \
#    .config("spark.jars.packages", f"org.apache.hadoop:hadoop-aws:{hadoop_aws_ver},org.apache.hadoop:hadoop-common:{hadoop_aws_ver},org.apache.hadoop:hadoop-client:{hadoop_aws_ver}") \
#    .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'com.amazonaws.auth.DefaultAWSCredentialsProviderChain') \
#    .config('spark.driver.extraJavaOptions', '-Dio.netty.tryReflectionSetAccessible=true') \
#    .config('spark.executor.extraJavaOptions', '-Dio.netty.tryReflectionSetAccessible=true') \
#    .config('spark.driver.memory', '8G') \
#    .getOrCreate()

#spark = SparkSession \
#    .builder \
#    .appName("apollo-simge") \
#    .config("spark.jars.packages", f"org.apache.hadoop:hadoop-aws:{hadoop_aws_ver},org.apache.hadoop:hadoop-common:{hadoop_aws_ver},org.apache.hadoop:hadoop-client:{hadoop_aws_ver}") \
#    .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'com.amazonaws.auth.DefaultAWSCredentialsProviderChain') \
#    .config('spark.driver.memory', '8G') \
#    .config('spark.executor.heartbeatInterval', '6000s') \
#    .config('spark.network.timeout', '7200s') \
#    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/07 19:05:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
df = spark.read.parquet(parquet_file) #filepath

                                                                                

In [6]:
rdd = df.rdd

def fixup_lang(lang):
    lang = lang.lower()
    return lang if lang != 'h' else 'c'

def mapper(func):
    return lambda row: (fixup_lang(row['language']), (func(row), 1, row['fileName'], row['_id']))

def discardFilename(row):
    return (row[0], row[1][0:2])

def discardFileCount(row):
    return (row[0], (row[1][0], row[1][2]))

def reducePair(x, y):
    return (x[0] + y[0], x[1] + y[1])

def getAnalytics(specific_mapper_func):
    result = {}
    
    mappedRdd = rdd.map(mapper(specific_mapper_func))
    
    fileCount = mappedRdd.map(discardFilename).reduceByKey(reducePair)
    result['fileCount'] = fileCount.collectAsMap()
    
    average = lambda x: (x[0], x[1][0] / x[1][1])
    
    averagePerFile = fileCount.map(average)
    result['averagePerFile'] = averagePerFile.collectAsMap()
    
    getMax = lambda x, y: x if x[0] > y[0] else y
    getMin = lambda x, y: y if x[0] > y[0] else x
    
    maxFile = mappedRdd.map(discardFileCount).reduceByKey(getMax)
    result['maxFile'] = maxFile.collectAsMap()
    
    minFile = mappedRdd.map(discardFileCount).reduceByKey(getMin)
    result['minFile'] = minFile.collectAsMap()
    
    def valSubMeanSqr(x):
        stats = result['fileCount'][x[0]]
        avg = stats[0] / stats[1]
        cnt = stats[1]
        return x[0], ((x[1][0] - avg) ** 2) / cnt

    from math import sqrt
    stddev = mappedRdd.map(valSubMeanSqr).reduceByKey(lambda x, y: x + y).map(lambda x: (x[0], sqrt(x[1])))
    result['stddev'] = stddev.collectAsMap()

    return result, mappedRdd

In [10]:
def analyticsToDF(result_dict):
    import pandas as pd
    ret = pd.DataFrame()
    for key, value in result_dict.items():
        ret[key] = value.items()
    return ret

In [8]:
wordCountAnalytics, _ = getAnalytics(
    lambda x: len([word for word in x['content'].split() if any(char.isalnum() for char in word)]))

                                                                                

In [11]:
analyticsToDF(wordCountAnalytics)

Unnamed: 0,fileCount,averagePerFile,maxFile,minFile,stddev
0,"(go, (120720809, 145481))","(go, 829.8046411558897)","(go, (149981, mattermost/mattermost-server/ven...","(go, (4, kubernetes/kubernetes/vendor/github.c...","(go, 3077.4473765876473)"
1,"(java, (99082288, 285483))","(java, 347.06896032338176)","(java, (92630, ReactiveX/RxJava/src/main/java/...","(java, (1, JetBrains/intellij-community/java/j...","(java, 783.8430423631291)"
2,"(json, (55995767, 123717))","(json, 452.61174292942763)","(json, (92513, plotly/plotly.js/test/image/moc...","(json, (0, envoyproxy/envoy/test/common/json/j...","(json, 2295.8650597705678)"
3,"(c++, (80753408, 102855))","(c++, 785.1189344222449)","(c++, (76290, MarlinFirmware/Marlin/Marlin/src...","(c++, (1, CRYTEK/CRYENGINE/Code/CryEngine/CryA...","(c++, 1887.681692027397)"
4,"(c, (176239271, 186286))","(c, 946.0682552634122)","(c, (158539, arangodb/arangodb/arangod/IResear...","(c, (0, coolsnowwolf/lede/package/lean/mt/driv...","(c, 2644.7094094118743)"
5,"(javascript, (116910771, 278657))","(javascript, 419.5508133655354)","(javascript, (160155, octobercms/october/modul...","(javascript, (0, webpack/webpack/test/statsCas...","(javascript, 2303.2706308001707)"
6,"(markdown, (64364783, 108189))","(markdown, 594.9290870606069)","(markdown, (108190, jgm/pandoc/changelog.md))","(markdown, (0, markdown-it/markdown-it/benchma...","(markdown, 1916.1295626313513)"
7,"(python, (54311104, 89750))","(python, 605.1376490250697)","(python, (59905, python/cpython/Lib/pydoc_data...","(python, (0, pytorch/pytorch/caffe2/python/bui...","(python, 1369.0119376268153)"
8,"(yaml, (8403805, 28588))","(yaml, 293.9626766475444)","(yaml, (74235, kubernetes/kops/addons/promethe...","(yaml, (1, JetBrains/intellij-community/plugin...","(yaml, 1806.4060698982705)"
9,"(rust, (7384468, 22194))","(rust, 332.7236189961251)","(rust, (65560, servo/servo/components/script/u...","(rust, (1, rust-lang/rust/src/test/rustdoc/iss...","(rust, 1028.0239093870887)"


In [12]:
lineCountAnalytics, _ = getAnalytics(
    lambda x: len([line for line in x['content'].split('\n') if line]))

                                                                                

In [13]:
analyticsToDF(lineCountAnalytics)

Unnamed: 0,fileCount,averagePerFile,maxFile,minFile,stddev
0,"(go, (36546270, 145481))","(go, 251.20991744626446)","(go, (122875, mattermost/mattermost-server/ven...","(go, (2, golang/go/test/dwarf/dwarf.dir/z10.go))","(go, 1105.8536246199292)"
1,"(java, (32762612, 285483))","(java, 114.7620418728961)","(java, (25011, apache/hadoop/hadoop-yarn-proje...","(java, (1, JetBrains/intellij-community/platfo...","(java, 276.83331562962445)"
2,"(json, (32540768, 123717))","(json, 263.0258412344302)","(json, (58290, pingcap/tidb/planner/core/testd...","(json, (1, apache/spark/python/test_support/sq...","(json, 1434.0850371159393)"
3,"(c++, (28517563, 102855))","(c++, 277.25986096932576)","(c++, (23485, arangodb/arangodb/3rdParty/V8/v7...","(c++, (1, CRYTEK/CRYENGINE/Code/CryEngine/CryA...","(c++, 641.0618527311872)"
4,"(c, (53561635, 186286))","(c, 287.52367327657475)","(c, (41188, ruby/ruby/enc/unicode/12.1.0/name2...","(c, (1, arangodb/arangodb/3rdParty/boost/1.71....","(c, 735.8978195100211)"
5,"(javascript, (40262479, 278657))","(javascript, 144.4875922729377)","(javascript, (59175, keystonejs/keystone-class...","(javascript, (1, prettier/prettier/tests/js/cu...","(javascript, 756.8072002834125)"
6,"(markdown, (11268062, 108189))","(markdown, 104.15164203384818)","(markdown, (16849, apachecn/AiLearning/docs/tf...","(markdown, (1, qianguyihao/Web/13-前端面试/03-面试题积...","(markdown, 290.9212870230607)"
7,"(python, (16713619, 89750))","(python, 186.22416713091923)","(python, (22430, psf/black/profiling/list_huge...","(python, (0, pytorch/pytorch/caffe2/python/bui...","(python, 413.34188371789793)"
8,"(yaml, (2782421, 28588))","(yaml, 97.32828459493494)","(yaml, (23645, Kong/kong/spec/fixtures/burst.y...","(yaml, (1, microsoft/winget-cli/src/AppInstall...","(yaml, 550.3695080183901)"
9,"(rust, (2661591, 22194))","(rust, 119.92389835090565)","(rust, (10414, rust-lang/rust/src/test/ui/issu...","(rust, (1, rust-lang/rust/src/test/ui/generics...","(rust, 329.913186555851)"


In [12]:
def to_dict(x):
    ret = {}
    sp = re.split('[^a-zA-Z0-9_]+', x)
    for word in sp:
        if not word: continue
        if len(word) > 2000: continue
        if word in ret: ret[word] = ret[word] + 1
        else: ret[word] = 1
    return ret

wordCountPerFileMap = rdd.map(
    lambda x: (fixup_lang(x['_id']), (to_dict(x['content']), fixup_lang(x['language']), x['fileName'])))

wordCountPerLangMap = rdd.map(
    lambda x: (fixup_lang(x['language']), (to_dict(x['content']), x['_id'])))

In [13]:
esc = es(hosts=elastic_hosts, verify_certs=False)

indexWithoutFile = {
    'settings': {
        'number_of_shards': 20,
        'number_of_replicas': 0,
    },
    'mappings': {
        '_source': {
            'enabled': False
        },
        'properties': {
            'language': { 
                'type': 'keyword'
            },
            'info': {
                'properties': {
                    'word': {
                        'type': 'keyword'
                    },
                    'wordCount': {
                        'type': 'long'
                    },
                    'wordLength': {
                        'type': 'long'
                    }
                }
            }
        }
    }
}

indexWithFile = indexWithoutFile.copy()

indexWithFile['mappings']['properties']['fileName'] = {
    'type': 'text',
    'fields': {
        'keyword': {
            'type': 'keyword'
        }
    }
}

  **transport_kwargs,
  **transport_kwargs,
  **transport_kwargs,


In [14]:
esc.indices.delete(index='apollo_word-frequency-per-file', ignore=[400, 404])
esc.indices.create(index='apollo_word-frequency-per-file', body=indexWithFile)

  """Entry point for launching an IPython kernel.
  


ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'apollo_word-frequency-per-file'})

In [16]:
from elasticsearch import helpers

def sendToElasticBatch(rows):
    esc = es(
        hosts=elastic_hosts,
        verify_certs=False,
        request_timeout=600,
        max_retries=10,
        retry_on_timeout=True)
    
    def generator():
        for row in rows:
            for word, count in row[1][0].items():
                yield {
                    '_index': 'apollo_word-frequency-per-file',
                    '_id': row[0] + word,
                    '_source': {
                        'language': row[1][1],
                        'fileName': row[1][2],
                        'info': {
                            'word': word,
                            'wordCount': count,
                            'wordLength': len(word)
                        }
                    }
                }
    for _, _ in helpers.streaming_bulk(esc, generator(), chunk_size=500, max_retries=9000000, request_timeout=7200, initial_backoff=10, max_backoff=30, ignore_status=[400]):
        pass
    
wordCountPerFileMap.foreachPartition(sendToElasticBatch)

                                                                                

In [17]:
def sumWordCount(x, y):
    for word, val in y[0].items():
        if word in x[0]: x[0][word] += val
        else: x[0][word] = val
    return x

def filterLowFreq500(x):
    filtered_words = {word: count for word, count in x[1][0].items() if count > 500} # x[1] is words dictionary
    return (x[0], filtered_words)

def filterLowFreq2000(x):
    filtered_words = {word: count for word, count in x[1][0].items() if count > 2000} # x[1] is words dictionary
    return (x[0], filtered_words)

In [None]:
wordCountPerLang500 = wordCountPerLangMap.reduceByKey(sumWordCount).map(filterLowFreq500).collectAsMap()

In [18]:
wordCountPerLang2000 = wordCountPerLangMap.reduceByKey(sumWordCount).map(filterLowFreq2000).collectAsMap()

                                                                                

In [None]:
esc.indices.delete(index='apollo_word-precalculated-frequency-per-lang')
esc.indices.create(index='apollo_word-precalculated-frequency-per-lang', body=indexWithoutFile)

In [None]:
from elasticsearch import helpers

def sendToElasticBatch(rows):
    esc = es(
        hosts=elastic_hosts,
        verify_certs=False,
        request_timeout=600,
        max_retries=100,
        retry_on_timeout=True)
    
    def generator():
        for row in rows.items():
            for word, count in row[1].items():
                yield {
                    '_index': 'apollo_word-precalculated-frequency-per-lang',
                    'language': row[0],
                    'info': {
                        'word': word,
                        'wordCount': count,
                        'wordLength': len(word)
                    }
                }
    helpers.bulk(esc, generator(), chunk_size=500, max_retries=9000000, initial_backoff=10, max_backoff=120)
    
sendToElasticBatch(wordCountPerLang500)

In [15]:
esc.indices.delete(index='apollo_word-filtered-frequency-per-file', ignore=[400, 404])
esc.indices.create(index='apollo_word-filtered-frequency-per-file', body=indexWithFile)

  """Entry point for launching an IPython kernel.
  


ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'apollo_word-filtered-frequency-per-file'})

In [19]:
# returns true for words that appear > 1000 times per language, filtering others
def filterInsignificant(x):
    lang = x[1][1]
    words = x[1][0]
    filtered_words = {word: count for word, count in words.items() if word in wordCountPerLang2000[lang]}
    return (x[0], filtered_words, x[1][1], x[1][2])
    
wordCountPerFileFilteredMap = wordCountPerFileMap.map(filterInsignificant)

In [20]:
from elasticsearch import helpers

def sendToElasticBatch(rows):
    esc = es(
        hosts=elastic_hosts,
        verify_certs=False,
        request_timeout=600,
        max_retries=100,
        retry_on_timeout=True)
    
    def generator():
        for row in rows:
            for word, count in row[1].items():
                yield {
                    '_index': 'apollo_word-filtered-frequency-per-file',
                    '_id': row[0] + word,
                    '_source': {
                    'language': row[2],
                    'fileName': row[3],
                        'info': {
                            'word': word,
                            'wordCount': count,
                            'wordLength': len(word)
                        }
                    }
                }
    for ok, response in helpers.streaming_bulk(esc, generator(), chunk_size=500, request_timeout=7200, max_retries=9000000, initial_backoff=10, max_backoff=120, ignore_status=[400]):
        pass
            
wordCountPerFileFilteredMap.foreachPartition(sendToElasticBatch)

                                                                                

In [23]:
esc.indices.delete(index='apollo_line-count-per-file', ignore=[400, 404])
esc.indices.create(index='apollo_line-count-per-file', body={
    'settings': {
        'number_of_shards': 20,
        'number_of_replicas': 2,
    },
    'mappings': {
        '_source': {
            'enabled': False
        },
        'properties': {
            'language': {
                'type': 'keyword'
            },
            'fileName': {
                'type': 'text',
                'fields': {
                    'keyword': {
                        'type': 'keyword'
                    }
                }
            },
            'info': {
                'properties': {
                    'lineCount': {
                        'type': 'long'
                    }
                }
            }
        }
    }
})

  """Entry point for launching an IPython kernel.


ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'apollo_line-count-per-file'})

In [25]:
from elasticsearch import helpers

def sendToElasticBatch(rows):
    esc = es(
        hosts=elastic_hosts,
        verify_certs=False,
        request_timeout=600,
        max_retries=100,
        retry_on_timeout=True)
    
    def generator():
        for row in rows:
            yield {
                '_index': 'apollo_line-count-per-file',
                '_id': row['_id'],
                '_source': {
                    'language': fixup_lang(row['language']),
                    'fileName': row['fileName'],
                    'info': {
                        'lineCount': len(list((l for l in row['content'].split('\n') if l)))
                    }
                }
            }
    for ok, response in helpers.streaming_bulk(esc, generator(), chunk_size=500, request_timeout=7200, max_retries=9000000, initial_backoff=10, max_backoff=120, ignore_status=[400]):
        pass

rdd.foreachPartition(sendToElasticBatch)

22/04/01 21:25:18 WARN HeartbeatReceiver: Removing executor 1 with no recent heartbeats: 7255746 ms exceeds timeout 7200000 ms
22/04/01 21:25:18 WARN HeartbeatReceiver: Removing executor 0 with no recent heartbeats: 7256040 ms exceeds timeout 7200000 ms
22/04/01 21:25:18 ERROR TaskSchedulerImpl: Lost executor 1 on 172.31.8.98: Executor heartbeat timed out after 7255746 ms
22/04/01 21:25:18 ERROR TaskSchedulerImpl: Lost executor 0 on 172.31.1.163: Executor heartbeat timed out after 7256040 ms
