-
Notifications
You must be signed in to change notification settings - Fork 254
/
extractor.py
executable file
·173 lines (147 loc) · 5.65 KB
/
extractor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
#!/usr/bin/env python3
from datetime import datetime
import json
import os
import re
import argparse
import csv
import copy
import sys
import gzip
strptime = datetime.strptime
class attriObject:
"""Class object for attribute parser."""
def __init__(self, string):
self.value = re.split(":", string)
self.title = self.value[-1]
def getElement(self, json_object):
found = [json_object]
for entry in self.value:
for index in range(len(found)):
try:
found[index] = found[index][entry]
except (TypeError, KeyError):
print(
"'{0}' is not a valid json entry.".format(":".join(self.value))
)
sys.exit()
# If single search object is a list, search entire list. Error if nested lists.
if isinstance(found[index], list):
if len(found) > 1:
raise Exception(
"Extractor currently does not handle nested lists."
)
found = found[index]
return found
def tweets_files(string, path):
"""Iterates over json files in path."""
for filename in os.listdir(path):
if re.match(string, filename) and ".jsonl" in filename:
f = gzip.open if ".gz" in filename else open
yield path + filename, f
Ellipsis
def parse(args):
with open(args.output, "w+", encoding="utf-8") as output:
csv_writer = csv.writer(output, dialect=args.dialect)
csv_writer.writerow([a.title for a in args.attributes])
count = 0
tweets = set()
for filename, f in tweets_files(args.string, args.path):
print("parsing", filename)
with f(filename, "rb") as data_file:
for line in data_file:
try:
json_object = json.loads(line.decode("utf-8"))
except ValueError:
print("Error in", filename, "entry incomplete.")
continue
# Check for duplicates
identity = json_object["id"]
if identity in tweets:
continue
tweets.add(identity)
# Check for time restrictions.
if args.start or args.end:
tweet_time = strptime(
json_object["created_at"], "%a %b %d %H:%M:%S +0000 %Y"
)
if args.start and args.start > tweet_time:
continue
if args.end and args.end < tweet_time:
continue
# Check for hashtag.
if args.hashtag:
for entity in json_object["entities"]["hashtags"]:
if entity["text"].lower() == args.hashtag:
break
else:
continue
count += extract(json_object, args, csv_writer)
print("Searched", len(tweets), "tweets and recorded", count, "items.")
print("largest id:", max(tweets))
def extract(json_object, args, csv_writer):
"""Extract and write found attributes."""
found = [[]]
for attribute in args.attributes:
item = attribute.getElement(json_object)
if len(item) == 0:
for row in found:
row.append("NA")
else:
found1 = []
for value in item:
if value is None:
value = "NA"
new = copy.deepcopy(found)
for row in new:
row.append(value)
found1.extend(new)
found = found1
for row in found:
csv_writer.writerow(row)
return len(found)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Extracts attributes from tweets.")
parser.add_argument(
"attributes",
nargs="*",
help="Attributes to search for. Attributes inside nested inside other attributes should be seperated by a colon. Example: user:screen_name, entities:hashtags:text.",
)
parser.add_argument(
"-dialect",
default="excel",
help="Sets dialect for csv output. Defaults to excel. See python module csv.list_dialects()",
)
parser.add_argument(
"-string",
default="",
help="Regular expression for files to parse. Defaults to empty string.",
)
parser.add_argument(
"-path",
default="./",
help="Optional path to folder containing tweets. Defaults to current folder.",
)
parser.add_argument(
"-output",
default="output.csv",
help="Optional file to output results. Defaults to output.csv.",
)
parser.add_argument(
"-start", default="", help="Define start date for tweets. Format (mm:dd:yyyy)"
)
parser.add_argument(
"-end", default="", help="Define end date for tweets. Format (mm:dd:yyyy)"
)
parser.add_argument(
"-hashtag", default="", help="Define a hashtag that must be in parsed tweets."
)
args = parser.parse_args()
if not args.path.endswith("/"):
args.path += "/"
args.start = strptime(args.start, "%m:%d:%Y") if args.start else False
args.end = strptime(args.end, "%m:%d:%Y") if args.end else False
args.attributes = [attriObject(i) for i in args.attributes]
args.string = re.compile(args.string)
args.hashtag = args.hashtag.lower()
parse(args)