# Scoring & Querying 4 weeks of data

---

## Scoring OPE

In [13]:
import json
import operator
import datetime
import itertools
import pyspark
from pyspark.sql import Row
import boto3


# In[3]:

timer = None
def timerStart():
    global timer
    timer = datetime.datetime.utcnow()

def timerEnd():
    global timer
    print (datetime.datetime.utcnow() - timer)

def scanOpeKeys(startKey, endKey):
    bucket = '***'
    startKey = startKey
    endKey = endKey
    bucket = boto3.resource('s3').Bucket(bucket)
    for obj in bucket.objects.filter(Marker=startKey):
        if obj.key < endKey: 
            yield obj.key
        else:
            return

def fluff(flat):
    deep = dict()
    for key in flat:
        d = deep
        path = key.split('/')
        while len(path) > 1:
            p = path.pop(0)
            d[p] = d.get(p, dict())
            d = d[p]
        d[path[0]] = flat[key]
    return deep

def readLocFile(f):
    f = open(fileLoc)
    flatDat = json.loads(f.read())
    f.close()
    return json.loads(flatDat)

def readS3File(key):
    s3obj = boto3.resource('s3').Object(bucket_name='***', key=key)
    body = s3obj.get()['Body']
    art = body.read().decode('utf-8')
    body.close()
    return json.loads(art)

def bundleArticleData(fileLoc):
    #f = open(fileLoc)
    #flatDat = json.loads(f.read())
    #f.close()
    try:
        flatDat = readS3File(fileLoc)
        art = fluff(flatDat)
        attrs = art.get('attrs')
        editorialRank = -10
        autoRank = -10
        for k in attrs:
            if 'feedRank' in attrs.get(k):
                autoRank = int(attrs.get(k)[9:])
            if 'source_rank' in attrs.get(k):
                editorialRank = int(attrs.get(k)[12:])
        publication = art.get('publication',('a'*38+'Unknown'))[38:]
        mediaType = art.get('types',{}).get('0','Unknown')
        sentDict = art.get('sentences', {})
        sentences = []
        articleId = art.get('id')
        #for artKeys in art:
        if publication == 'Unknown' or (mediaType != 'news' and mediaType != 'blog') or (editorialRank == -10 and autoRank == -10):
            return Row(failed=fileLoc, flag='missingLNInfo')
        for sentKey in sentDict:
            decay = .94 ** int(sentKey)
            sent = sentDict.get(sentKey)
            #sent = art.get(artKeys)
            themesDict = sent.get('themes')
            entDict = sent.get('entities')
            conceptDict = sent.get('concepts')
            sentenceText = sent['yield']
            mentions = []
            if not themesDict == None:
                mentions.extend([themesDict[k]['attributes']['TEXT'] for k in themesDict])
            if not entDict == None:
                mentions.extend([entDict[k].get('normalform') for k in entDict])
            #if not conceptDict == None:
            #    mentions.extend(conceptDict[k] for k in conceptDict)
            mentions = [Row(theme=m, preMentionScore=decay) for m in mentions]
            if mentions:
                sentences.append(Row(sentenceId='{}.{:04d}'.format(articleId, int(sentKey)), sentenceText=sentenceText, mentions=mentions))
        if sentences:
            return Row(articleId=articleId, mediaType=mediaType, title=art.get('title', 'Unknown'), author=art.get('authorId', 'Unknown'),
                      publication=publication, editorialRank=editorialRank, autoRank=autoRank, sentences=sentences)
        else:
            return Row(failed=fileLoc, flag='noSentences')
    except:
        return Row(failed=fileLoc, flag='parseSomething')


# In[4]:

def createTable(table, data, schema=None, samplingRatio=None):
    '''
    Creates a pyspark.sql data frame from 'data'.
    Registers it as a temp table named 'table'.
    Returns the data frame.
    '''
    t = sqlContext.createDataFrame(data, schema, samplingRatio)
    t.registerTempTable(table)
    return t

def selectInto(table, sqlQuery):
    '''
    Runs the 'sqlQuery' in the sqlContext to produce a data frame.
    Registers the data frame as a temp table named 'table'. 
    Returns the data frame.
    '''
    t = sqlContext.sql(sqlQuery)
    t.registerTempTable(table)
    return t

def showTable(table, n=20, truncate=True, sqlEpilogue=''):
    '''Shows a pyspark.sql table -- useful if you don't have the data frame in a variable.'''
    sqlContext.sql('select * from {} {}'.format(table, sqlEpilogue)).show(n=n, truncate=truncate)


def makeMention(**keys):
    return Row(**keys)

def makeMentions(themes, decay):
    return [makeMention(theme=theme, preMentionScore=decay) for theme in themes]

def makeSentence(**keys):
    return Row(**keys)

def text2sentences(articleId, articleText):
    '''Generates mention-bearing sentences from text using Pez.'''
    decay = 1.0
    for i, sentence in enumerate(pezCast.value.process_document(articleText)):
        decay *= .94 
        mentions = makeMentions(sentence['entities'] + sentence['concepts'], decay)
        if mentions:
            yield makeSentence(
                sentenceId=u'{}.{:04d}'.format(articleId, i), sentenceText=sentence['text'], mentions=mentions)

def dict2article(article):
    '''
    Generates Article Row from a dictionary representing Moreover json.
    Generates nothing if not English, not News/Blog, or no sentences with mentions.
    '''
    articleId = article['id']
    language = article.get('language', 'English')
    if language != 'English':
        return
    title = article.get('title', u'Unknown')
    author = article.get('author', {}).get('name', u'Unknown')
    articleText = article.get('content', u'')[0:3000]
    source = article.get('source', {})
    feed = source.get('feed', {})
    publication = source.get('name', u'Unknown')
    mediaType = feed.get('mediaType', u'Unknown')
    if mediaType not in {'News', 'Blog'}:
        return
    editorialRank = int(source.get('editorialRank', 0))
    autoRank = int(feed.get('rank', {}).get('autoRank', 0))
    #publicationAttention = rank2attention(editorialRank, autoRank)
    sentences = list(text2sentences(articleId, articleText))
    if sentences:
        yield Row(articleId=articleId, mediaType=mediaType, title=title, author=author,
                  publication=publication, editorialRank=editorialRank, autoRank=autoRank, sentences=sentences)

def dicts2articles(dicts):
    '''Generates articles from an iterable of dicts.'''
    return itertools.chain(*[dict2article(d) for d in dicts])


# ### From Test Dataset
# 
# One hour of moreover articles on:
# * content market\*
# * native ad\*
# * sponsored content
# 
# JQ command for selecting articles:
# 
# ```
# time cat moreover-2016-02-17T12/*/*/* | jq -c '.articles[] | select(has("content") and .language == "English") | select(.content | test("content\\s+market|native\\s+ad|sponsored\\s+content"; "i"))' > cm_articles.json
# ```

# In[6]:

def getOneTestArticle():
    with open('cm_articles.json') as f:
        for line in f:
            return json.loads(line)


# In[7]:

def createTestArticles(outputPath):
    timerStart()
    articles = sqlContext.createDataFrame(sc.textFile('cm_articles.json').map(json.loads).flatMap(dict2article))
    articles.write.parquet(outputPath)
    timerEnd()
    return articles


# In[7]:

# articles = createTestArticles('test_articles.parquet').cache()


# In[8]:

# articles = sqlContext.read.parquet('test_articles.parquet').cache()


# ### From S3 Dataset

# In[8]:


def packet2articles(key):
    print 'Processing: {}'.format(key)
    s3obj = boto3.resource('s3').Object(bucket_name='***', key=key)
    packet = json.loads(s3obj.get()['Body'].read().decode('utf-8'))
    return dicts2articles(packet.get('articles', list()))

def createS3Articles(startKey, endKey, outputPath):
    timerStart()
    keys = list(scanMoreoverKeys(startKey, endKey))
    #print 'Key list: {}'.format(keys)
    articles = sqlContext.createDataFrame(sc.parallelize(keys, 1000).flatMap(packet2articles))
    articles.write.parquet(outputPath)
    timerEnd()
    return articles


# In[10]:

# articles = createS3Articles('2016/02/17', '2016/02/18', 's3://dkddata/2_18PezzedArts.parquet').cache()
#testArts = createS3Articles('2016/02/17/01/00/', '2016/02/17/01/01', 's3://dkddata/tmp_1month/test.parquet')
#0:02:02.870538 with 20 nodes
# In[9]:

# testRead = sqlContext.read.option("mergeSchema", "false").parquet('s3://dkddata/tmp_1month/test.parquet').cache()


# ## Score Mentions

# In[12]:

def mentionCount(article):
    score = 0
    for s in article['sentences']:
        for m in s['mentions']:
            score += m['preMentionScore']
    return score

def countMentionsByPub(articles):
    '''Creates a table of mention counts for each publication in articles.'''
    createTable('mentionCountsByPub', articles
                .map(lambda a: (a.publication, mentionCount(a)))
                .reduceByKey(lambda x,y: x + y),
                ['publication', 'preMentionScoreSum'])

def rank2attention(editorialRank=0, autoRank=0):
    if editorialRank > 0:
        return {
            1:3000, 2:1600, 3:160, 4:130, 5:130
        }.get(editorialRank, 130)
    else:
        return {
            1:1400, 2:475, 3:200, 4:85, 5:50, 6:30, 7:25, 8:25, 9:25, 10:25
        }.get(autoRank, 25)

def scoreSentenceMentions(sentence, mentionScoreMultiplier):
    '''Assigns mentionScore to each mention in sentence.'''
    s = sentence.asDict()
    s['mentions'] = [Row(theme=m['theme'], score=m['preMentionScore'] * mentionScoreMultiplier) for m in s['mentions']]
    return Row(**s)

def scoreArticleMentions(articleAndMentionScore):
    '''
    Assigns articleAndMentionScore.mentionScore as the score of each mention in article.
    Returns article (without mentionScore column).
    '''
    a = articleAndMentionScore.asDict()
    mentionScoreMultiplier = a.get('mentionScoreMultiplier', 0)
    a['sentences'] = [scoreSentenceMentions(s, mentionScoreMultiplier) for s in a['sentences']]
    del a['mentionScoreMultiplier']
    return Row(**a)

def scoreArticles(articles):
    try:
        sqlContext.dropTempTable('articles')
    except:
        pass
    articles.registerTempTable('articles')
    sqlContext.registerFunction('rank2attention', rank2attention, pyspark.sql.types.IntegerType())
    countMentionsByPub(articles)
    scoredArticles = createTable(
        'scoredArticles',
        sqlContext.sql(
            'select a.*, 1.0 * rank2attention(a.editorialRank, a.autoRank) / p.preMentionScoreSum as mentionScoreMultiplier ' +
            'from articles a, mentionCountsByPub p where a.publication=p.publication'
        ).map(scoreArticleMentions))
    return scoredArticles


# In[13]:

def createScoredArticles(articles, outputPath):
    timerStart()
    scoredArticles = scoreArticles(articles)
    scoredArticles.write.parquet(outputPath)
    timerEnd()
    return scoredArticles


# In[16]:

# scoredArticles = createScoredArticles(articles, 'scoredArticles.parquet').cache()

# In[ ]:

# scoredArticles = sqlContext.read.parquet('scoredArticles.parquet').cache()

#scoredArticles = sqlContext.read.option("mergeSchema", "false").parquet('s3://dkddata/2_18ScoredArts.parquet').cache()
# ## Indexing




In [None]:
import time
keyPairs = [('2016-03-{:02d}'.format(i),'2016-03-{:02d}'.format(i+1)) for i in range(1, 29)]
for pair in keyPairs:
    t0 = time.time()
    try:
        startKey = pair[0]
        endKey = pair[1]
        keys = list(scanOpeKeys(startKey, endKey))
        pkeys = sc.parallelize(keys,1000)
        parsed = pkeys.map(bundleArticleData)
        #parsed.cache()
        #parsed.count()
        #failedKeys = sqlContext.createDataFrame(parsed.filter(lambda x: 'failedKey' in x.asDict()))
        #failedKeys.write.parquet('s3://***/monthScoring/failedKeys/failedKeys_' + startKey)
        arts = sqlContext.createDataFrame(parsed.filter(lambda x: 'failed' not in x.asDict()))
        arts.cache()
        arts.count()
        #createScoredArticles(articles,'s3://***/monthScoring/scoredArts/scoredArts_' + startKey)
        scoreArticles(arts).write.parquet('s3://***/monthScoring/scoredArts2/scoredArts_' + startKey)
        #parsed.unpersist()
        arts.unpersist()
        print '{} is done in {} seconds'.format(startKey, time.time()-t0)
    except:
        print startKey
        raise

---
## Querying OPE

In [84]:
from pyspark.sql import Row
def queryArts(article, query):
    meta = article.asDict()
    del meta['sentences']
    for sent in article['sentences']:
        sentText = sent['sentenceText'].lower()
        for term in query:
            if term in sentText:
                sentWithMeta = sent.asDict()
                sentWithMeta.update(meta)
                yield Row(**sentWithMeta)

def sentencesToMention(sent):
    meta = sent.asDict()
    del meta['mentions']
    for m in sent.asDict().get('mentions',[]):
        mention = m['theme']
        score = m['score']
        temp = dict(meta)
        temp.update({'mention':mention, 'score':score})
        yield Row(**temp)


artDir = ['s3://***/monthScoring/scoredArts2/scoredArts_2016-03-{:02d}'.format(i) for i in range(1, 29)]
query = ['string 1', 'string 2']

In [None]:
import time
t=time.time()
articles = sqlContext.read.option("mergeSchema", "false").parquet(*artDir)
queriedMentions = articles.flatMap(lambda x: queryArts(x,query))
mentionsDf = sqlContext.createDataFrame(queriedMentions.coalesce(100).flatMap(sentencesToMention))
mentionsDf.write.parquet('s3://***/monthScoring/contentMarketingScoredMentions')
time.time() - t

---
## Twitter Scoring

In [None]:
from pyspark.sql import Row
import json

def parsePersonaDat(row):
    parsed = row.split('\t')
    if len(parsed) != 3:
        return Row(failed='incorrectLength',parsed=parsed)
    return Row(profileId=parsed[0], handle=parsed[1], followerCount=parsed[2])

data = sc.textFile('s3://***/twitter_follower_counts/*').map(parsePersonaDat)
twits = sqlContext.createDataFrame(data.filter(lambda x: 'failed' not in x.asDict()))

In [None]:
def fluff(flat):
    deep = dict()
    for key in flat:
        d = deep
        path = key.split('/')
        while len(path) > 1:
            p = path.pop(0)
            d[p] = d.get(p, dict())
            d = d[p]
        d[path[0]] = flat[key]
    return deep

topicTweets = sqlContext.createDataFrame(sc.textFile('s3://***/monthScoring/cmTwitter.json').map(json.loads).map(fluff))
topicTweets2 = topicTweets.withColumn('profileId',topicTweets.authorId[0])

In [None]:
topicTweetsWithFollowers = topicTweets2.join(twits, twits.profileId == topicTweets2.profileId, 'left_outer').drop(twits.profileId)
topicTweetsWithFollowers.write.parquet('s3://***/monthScoring/cmTwitterWithFollowers2')

In [1]:
topicTweetsWithFollowers = sqlContext.read.option("mergeSchema", "false").parquet('inputs/cmTwitterWithFollowers2')

In [3]:
import ast
def extractTweetsAndScores(row, followerDivisor=18000):
    payloadList = ast.literal_eval(row['themePayload'])
    payLength = len(payloadList)
    if payLength == 0:
        return
    score = float(row['followerCount'])/(followerDivisor * payLength)
    date = row['timestamp'][:10]
    sentenceText = row['sentenceText']
    author = row['authorText'][0]
    sentenceId = row['sentenceId']
    for d in payloadList:
        twitterType = d['SOURCE']
        mention = d['TEXT']
        yield(Row(mediaType="Twitter", sentenceId=sentenceId, date=date, sentenceText=sentenceText, author=author, score=score, twitterType=twitterType, mention=mention))

In [5]:
sqlContext.createDataFrame(topicTweetsWithFollowers.flatMap(extractTweetsAndScores)).write.parquet('outputs/cmTwitterScored')

---
## Adjust for daily flucations

In [28]:
pubPubs = sqlContext.read.option("mergeSchema", "false").parquet('inputs/pubPublishingByDay')

In [30]:
pubPubs.cache()
pubPubs.count()

1403519

In [43]:
def opeAddDates(row):
    d = row.asDict()
    d['date'] = d['articleId'][:10]
    return Row(**d)

opeRaw = sqlContext.createDataFrame(sqlContext.read.option("mergeSchema", "false").parquet('inputs/contentMarketingScoredMentions').map(opeAddDates))

In [134]:
def getMultiplier(row, numDays = 28):
    numDays = float(numDays)
    d = row.asDict()
    if d['pubArtsInMonth'] < 100:
        d['dayMultiplier'] = 1.0
    else:
        d['dayMultiplier'] = d['pubsArticlesInDay'] / (d['pubArtsInMonth'] / numDays)
    d.pop('pubsArticlesInDay')
    d.pop('pubArtsInMonth')
    d.pop('autoRank')
    d.pop('editorialRank')
    d.pop('attention')
    return(Row(**d))

In [116]:
totPubCounts = pubPubs.groupBy(['publication', 'mediaType']).sum('pubsArticlesInDay').withColumnRenamed('sum(pubsArticlesInDay)','pubArtsInMonth')

In [167]:
pubsCounts = sqlContext.createDataFrame(pubPubs.join(totPubCounts, ['publication', 'mediaType'], 'left_outer').map(getMultiplier))

In [136]:
mergedOpePubCount = opeRaw.join(pubsCounts, ['date', 'publication', 'mediaType'], 'left_outer')
mergedOpePubCount.cache()

DataFrame[date: string, publication: string, mediaType: string, articleId: string, author: string, autoRank: bigint, editorialRank: bigint, mention: string, score: double, sentenceId: string, sentenceText: string, title: string, dayMultiplier: double]

In [160]:
def adjustScores(row):
    d = row.asDict()
    multiplier = d.pop('dayMultiplier')
    if not multiplier:
        multiplier = 1.0
    d['score'] = d.pop('score') * multiplier
    return Row(**d)

In [166]:
adjustedMentions = sqlContext.createDataFrame(mergedOpePubCount.map(adjustScores))
adjustedMentions.write.parquet('outputs/contentMarketingScoredMentions')

---
## Merge OPE & Twitter

In [2]:
def prepOpe(row):
    d = row.asDict()
    d['date'] = d['articleId'][:10]
    d['mentionType'] = 'NA'
    d['source'] = d.pop('mediaType')
    d.pop('editorialRank')
    d.pop('autoRank')
    return Row(**d)

def prepTwit(row):
    d = row.asDict()
    d['mentionType'] = d.pop('twitterType')
    d['source'] = d.pop('mediaType')
    d['articleId'] = 'Tweet, s=' + d['sentenceId']
    d['title'] = 'Tweet, s=' + d['sentenceId']
    d['publication'] = d['author']
    return Row(**d)

#articleId: string, author: string, autoRank: bigint, date: string, editorialRank: bigint, mediaType: string, 
##                        mention: string, publication: string, score: double, sentenceId: string, sentenceText: string, title: string
#                                                mentionType

In [7]:
twitRaw = sqlContext.createDataFrame(sqlContext.read.option("mergeSchema", "false").parquet('outputs/cmTwitterScored').map(prepTwit))
opeRaw = sqlContext.createDataFrame(sqlContext.read.option("mergeSchema", "false").parquet('outputs/contentMarketingScoredMentions').map(prepOpe))

In [8]:
mergedScores = twitRaw.unionAll(opeRaw)

In [171]:
mergedScores.toPandas().to_excel('cmScoresAdjusted.xlsx', sheet_name = 'Sheet1', index = False)

In [172]:
mergedScores.count()

80217

---
## Look at Scores

In [109]:
inputs = sqlContext.read.option("mergeSchema", "false").parquet('inputs/contentMarketingScoredMentions')
inputs.cache()
inputs.show(5)

+--------------------+-------+--------+-------------+---------+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+
|           articleId| author|autoRank|editorialRank|mediaType|             mention|publication|               score|          sentenceId|        sentenceText|               title|
+--------------------+-------+--------+-------------+---------+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+
|2016-03-01-14-13-...|Unknown|     -10|            3|     news|      store analytic|   BioSpace|0.001055191598905239|2016-03-01-14-13-...|With our comprehe...|69% of Shoppers W...|
|2016-03-01-14-13-...|Unknown|     -10|            3|     news|    store investment|   BioSpace|0.001055191598905239|2016-03-01-14-13-...|With our comprehe...|69% of Shoppers W...|
|2016-03-01-14-13-...|Unknown|     -10|            3|     news|      content market|   BioSpace

In [110]:
inputs.count()

62672

In [111]:
inputs.toPandas().to_excel('cmScoredMentions' + '.xls', sheet_name = 'Sheet1', index = False)

In [116]:
from pyspark.sql import Row
def addDate(mention):
    mentDict = mention.asDict()
    mentDict['date'] = mentDict.get('sentenceId')[:10]
    return(Row(**mentDict))

In [117]:
mentions = sqlContext.createDataFrame(inputs.map(addDate))
mentions.cache()

In [119]:
mentions.show(5)

+--------------------+-------+--------+----------+-------------+---------+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+
|           articleId| author|autoRank|      date|editorialRank|mediaType|             mention|publication|               score|          sentenceId|        sentenceText|               title|
+--------------------+-------+--------+----------+-------------+---------+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+
|2016-03-01-14-13-...|Unknown|     -10|2016-03-01|            3|     news|      store analytic|   BioSpace|0.001055191598905239|2016-03-01-14-13-...|With our comprehe...|69% of Shoppers W...|
|2016-03-01-14-13-...|Unknown|     -10|2016-03-01|            3|     news|    store investment|   BioSpace|0.001055191598905239|2016-03-01-14-13-...|With our comprehe...|69% of Shoppers W...|
|2016-03-01-14-13-...|Unknown|     -10|2

In [120]:
mentions.groupBy('date').sum('score').withColumnRenamed('sum(score)','dateScore').show(30)

+----------+------------------+
|      date|         dateScore|
+----------+------------------+
|2016-03-01| 665.6508999256231|
|2016-03-02|  846.961384535139|
|2016-03-03| 909.0376272354772|
|2016-03-04| 593.9498856296218|
|2016-03-05| 91.41313889301027|
|2016-03-06| 111.7143222887835|
|2016-03-07| 900.5253512237755|
|2016-03-08| 799.8537637547616|
|2016-03-09|1206.3520678409448|
|2016-03-10|  676.203058148176|
|2016-03-11| 733.0035486823363|
|2016-03-12| 90.24831020583008|
|2016-03-13|200.28356574169186|
|2016-03-14| 504.4277031801616|
|2016-03-15|1385.7246504885918|
|2016-03-16|1227.9653965832435|
|2016-03-17|1341.9984840220375|
|2016-03-18| 714.4126330280028|
|2016-03-19|200.82049866570136|
|2016-03-20| 280.6937741884606|
|2016-03-21| 606.6077094157159|
|2016-03-22|1375.8682602446008|
|2016-03-23|2430.0988696698714|
|2016-03-24| 749.7466996086685|
|2016-03-25|1258.1656137078212|
|2016-03-26| 517.5927715153019|
|2016-03-27|  81.8523662813033|
|2016-03-28| 865.1212742786892|
+-------

In [202]:
from pyspark.sql.functions import desc, rank
ranks = mentions.groupBy('mention').sum('score').withColumnRenamed('sum(score)','overalScore').withColumn('overalRank',rank().over(Window.partitionBy().orderBy(desc('overalScore')))).filter('overalRank <= 15')
dateMentions = mentions.groupBy(['date','mention']).sum('score').withColumnRenamed('sum(score)','score')

In [206]:
dayRanks = ranks.join(dateMentions, 'mention', 'left_outer').withColumn('dayRank',rank().over(Window.partitionBy('date').orderBy(desc('score'))))

+--------------------+------------------+----------+----------+-------------------+-------+
|             mention|       overalScore|overalRank|      date|              score|dayRank|
+--------------------+------------------+----------+----------+-------------------+-------+
|      content market|3457.4538754036957|         1|2016-03-01|  82.68577399059035|      1|
|           advertise| 287.8561747449625|         3|2016-03-01| 18.690894383073065|      2|
|             content|161.12239840935393|         8|2016-03-01|  9.223506605259598|      3|
|            marketer|126.85160943878743|        12|2016-03-01|  8.050745761075522|      4|
|content market ef...|125.92245256747059|        13|2016-03-01|  7.725669440442079|      5|
|Content Marketing...|120.89069726715745|        14|2016-03-01| 7.1719926980333355|      6|
|    native advertise| 485.1678118247768|         2|2016-03-01|  6.665304878956423|      7|
|content market st...|  251.370269509646|         4|2016-03-01|  6.4522261006763

---
## ScratchCode