Permalink
Browse files

initial commit

  • Loading branch information...
Paul Petring
Paul Petring committed Jul 19, 2016
1 parent 09e592f commit bffa5613f3a7af0b88d98008abd743f0a8280600
Showing with 552 additions and 0 deletions.
  1. +216 −0 FritzBox.py
  2. +140 −0 GoogleCalendarWrapper.py
  3. +67 −0 README.md
  4. 0 __init__.py
  5. +1 −0 calendar_default.json
  6. +1 −0 client_secret_default.json
  7. +127 −0 main_default.py
View
@@ -0,0 +1,216 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @author Paul Petring
from pprint import pprint
from lxml import etree
import hashlib
import requests
import json
import datetime
from datetime import datetime, timedelta
import pytz
class FritzBox:
"""
allows to interact with Fritzbox OS 6.30 - 6.60 by using HTTP requests
"""
_url="" #URL of the FritzBox
_username="" #username of the FritzBox
_password="" #password of the FritzBox
_sid="" #current session identifier
_last_calls=[] #calls buffer
_last_devices=[] #devices buffer
_request_session = requests.Session() #request session object
_request_headers = { #default headers, feel free to modify these
'Referer': 'http://fritz.box/',
'Pragma' : 'no-cache',
'User-Agent': 'User-Agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.106 Safari/537.36',
'Accept-Language': 'en-US,en;q=0.8,de;q=0.6',
'Accept-Encoding': 'gzip, deflate, sdch',
'Connection': 'keep-alive',
'Cache-Control': 'no-cache',
}
def __init__(self,password="",url="http://fritz.box",username="",login=True):
"""
@param password of your fritz.box (no username support yet)
@param url of your fritzbox (defaults to "http://fritz.box")
"""
self._url = url
self._password = password
self._username = username
if(login):
self._sid = self.login()
if(self._url!="http://fritz.box"):
self._request_headers["Referer"] = self._url + "/"
def getSID(self):
""" returnes current SID status and challenge
required for logging into the FritzBox
"""
status_url = self._url + "/login_sid.lua"
r = self._request_session.get(status_url)
"""Expected response:
<?xml version="1.0" encoding="utf-8"?>
<SessionInfo>
<SID>0000000000000000</SID>
<Challenge>443a0e07</Challenge>
<BlockTime>0</BlockTime>
<Rights></Rights>
</SessionInfo>
"""
#Parsing XML
parser = etree.XMLParser(recover=True)
root = etree.fromstring(str(r.content), parser=parser)
ret_sid=root.find('SID').text
challenge=root.find('Challenge').text
return (ret_sid,challenge)
def login(self):
"""
performs an login by getting the FirtzBox session challenge
hashing it in combination with the provided password and
returning the newly achieved session identifier
"""
sid_status = self.getSID()
self._sid = sid_status[0]
challenge = sid_status[1]
if(sid_status[0]=="0000000000000000"): # login procedure required
#following the login javascript of OS 6.30+
#encoding it to utf-16E does the trick to get the correct hash
cp_str = challenge + "-" + self._password
md5_str = hashlib.md5(cp_str.encode("utf-16LE")).hexdigest()
response = challenge + "-" + md5_str
#preparing POST statement
post = "response="+response+"&lp=&username="+self._username
data = dict(response=response, lp='',username=self._username)
r = self._request_session.post(self._url, data=data, allow_redirects=True,headers=self._request_headers)
#extracting SID from response (mostly in the last few lines of response)
self._sid = r.content[r.content.find('"sid":'):]
self._sid = self._sid[8:self._sid.find("});")-2]
return self._sid;
def get_devices(self,device_type="active"):
"""
returns a list of the current home network devices as FritzBoxDevice objects
@device_type defaults to active devices, else returns inactive (passive) devices of home network
"""
data = dict(xhr=1, sid=self._sid, lang='en',page='netDev',type="cleanup")
r = self._request_session.post(self._url+"/data.lua", data=data, allow_redirects=True,headers=self._request_headers)
#r.content should contain valid json string with active and passive devices
parsed=json.loads(r.content)
ret_list=[]
if(device_type=="active"):
for active in parsed["data"]["active"]:
ret_list.append(FritzBoxDevice(active))
else:
for passive in parsed["data"]["passive"]:
ret_list.append(FritzBoxDevice(passive))
return ret_list
def get_foncalls(self):
"""
returns a list of last 400(?) fon calls as FritzBoxCall objects
"""
data = dict(sid=self._sid)
r = self._request_session.post(self._url+"/fon_num/foncalls_list.lua?sid="+self._sid+"&csv=", data=data, allow_redirects=True,headers=self._request_headers)
#r.content contains semicolon separated values, surrounding head and tail line
ret_list = []
for line in r.content.split('\n')[2:-1]:
ret_list.append(FritzBoxCall(line))
_last_calls = ret_list
return ret_list
class FritzBoxCall:
call_type="" #int
date="" #dateTime
caller_name="" #name of the caller set by FritzBox fon book
caller_number="" #number of the caller as string, as it can be anonymous
fon="" #name of the called internal device
number="" #number of the called internal devices
duration="" #duration as python timespan
UID="" #unique identifier of the call
def __init__(self,csv_line):
parts=csv_line.split(';')
self.call_type = int(parts[0])
parse_date = datetime.now(pytz.timezone('Europe/Berlin'))
tz = pytz.timezone('Europe/Berlin')
self.date = parse_date.strptime(parts[1] + " CET", "%d.%m.%y %H:%M %Z")
tzoffset = tz.utcoffset(self.date)
self.date = self.date-tzoffset
self.caller_name = parts[2]
self.caller_number = parts[3]
self.fon = parts[4]
self.number = parts[5]
t = datetime.strptime(parts[6],"%H:%M")
self.duration = timedelta(hours=t.hour, minutes=t.minute, seconds=t.second)
self.UID = self.get_UID()
def get_UID(self):
return hashlib.md5(self.date.isoformat()+self.caller_number).hexdigest()
def __repr__(self): #debug purposes
return str(self.date) + " " +self.caller_name + " " +self.caller_number + " " +str(self.duration)
def __str__(self): #debug purposes
return str(self.date) + " " +self.caller_name + " " +self.caller_number + " " +str(self.duration)
class FritzBoxDevice:
mac="" #mac adress as string
ipv6="" #ipv6 adress of the device as string
state="" # state as string
name="" # name as string
port="" # port as string
summarypropertie="" # summarypropertie as string (no typo! missing r is real)
classes="" # classes as string
url="" # url as string
type="" # type as string (lan|wlan etc)
ipv4="" # ipv4 as string
UID="" #UID as string
def __init__(self,parsed_json):
"""
expected parsed json and inits values as string
"""
self.mac=parsed_json["mac"]
self.ipv6=parsed_json["ipv6"]
self.UID=parsed_json["UID"]
self.state=parsed_json["state"]
self.port=parsed_json["port"]
self.name=parsed_json["name"]
self.summarypropertie=parsed_json["summarypropertie"]
self.classes=parsed_json["classes"]
self.url=parsed_json["url"]
self.type=parsed_json["type"]
self.ipv4=parsed_json["ipv4"]
self.UID=self.get_UID()
def get_UID(self):
if self.UID:
return self.UID
return str(self) #if vpn UID seems to be empty
def __repr__(self): #debug purposes
return self.UID + " " +self.ipv4 + " " +self.type + " " +self.name
def __str__(self): #debug purposes
return self.UID + " " +self.ipv4 + " " +self.type + " " +self.name
View
@@ -0,0 +1,140 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @author Paul Petring
import sys
import imaplib
import getpass
import email
import email.header
import datetime
from datetime import datetime, timedelta
import time
from pprint import pprint
import httplib2
import os
from apiclient import discovery
import oauth2client
from oauth2client import client
from oauth2client import tools
class GoogleCalendarWrapper:
_calendarID = ""
SCOPES = 'https://www.googleapis.com/auth/calendar' #' # do not use .readonly' and reset the key in case you already did
CLIENT_SECRET_FILE = './client_secret.json'
APPLICATION_NAME = 'Google Calendar API Python Quickstart'
credentials = ""
http = ""
service = ""
def __init__(self,calendarID):
self._calendarID = calendarID
self.credentials = self.get_credentials()
self.http = self.credentials.authorize(httplib2.Http())
self.service = discovery.build('calendar', 'v3', http=self.http)
def get_credentials(self):
"""Gets valid user credentials from storage.
If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.
Returns:
Credentials, the obtained credential.
"""
current_dir = os.path.expanduser('.')
credential_path = os.path.join(current_dir, 'calendar.json')
store = oauth2client.file.Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
flow.user_agent = APPLICATION_NAME
if flags:
credentials = tools.run_flow(flow, store, flags)
else: # Needed only for compatibility with Python 2.6
credentials = tools.run(flow, store)
print('Storing credentials to ' + credential_path)
return credentials
def get_events(self,days=365,maxResults=10000):
"""
gets recent calendar entries from calendar
"""
now = (datetime.utcnow() - timedelta(days=days)).isoformat() + 'Z'
eventsResult = self.service.events().list(
calendarId=self._calendarID, timeMin=now, maxResults=maxResults, singleEvents=True,
orderBy='startTime').execute()
ret_list = eventsResult.get('items', [])
for item in ret_list:
#writing the current UID to the description field to identify the event later on
#might be improved to search for an UID line like UID: identifer to be able to use
#the description for other purposes too
if "description" in item.keys():
item["UID"] = item["description"] #adding UID due to missing CustomCalendarClass
else:
pprint(item)
return ret_list
def provide_empty_event():
return { 'summary': '',
'location': '',
'description': '',
'start': {
'dateTime': datetime.utcnow().isoformat() + 'Z',
'timeZone': 'Europe/Berlin',
},
'end': {
'dateTime': (datetime.utcnow() + timedelta(minutes=2)).isoformat() + 'Z',
'timeZone': 'Europe/Berlin',
},
'recurrence': [
'' #'RRULE:FREQ=DAILY;COUNT=2'
],
'attendees': [
# {'email': 'mail@example.com'},
],
'reminders': {
#'useDefault': False,
#'overrides': [
# {'method': 'email', 'minutes': 24 * 60},
# {'method': 'popup', 'minutes': 10},
# ],
}
}
def create_event(self,event):
event = self.service.events().insert(calendarId=self._calendarID, body=event).execute()
return
def update_event(self,event_to_be_handled):
try:
# etrieve the existing event from the API.
event = self.service.events().get(calendarId=self._calendarID, eventId=event_to_be_handled["id"]).execute()
#only updating provided elements
for key in event_to_be_handled.keys():
event[key] = event_to_be_handled[key]
#trigger update
updated_event = self.service.events().update(calendarId=self._calendarID, eventId=event_to_be_handled["id"], body=event).execute()
except Exception,e:
print "error while updating"
print e
print event_to_be_handled
print event
pass
return
Oops, something went wrong.

0 comments on commit bffa561

Please sign in to comment.